AbstractFileOutputWriter Генерация дубликатов файлов tmp

У меня есть приложение Apache Apex, которое использует Kafka Logs и записывает его в HDFS.

Группа обеспечения доступности баз данных достаточно проста, так что есть потребитель Kafka (20 разделов памяти по 2 ГБ для оператора), подключенный потоком к "MyWriter extends AbstractFileOutputOperator".

Проблема: 1. Я видел, как Writer неоднократно писал файлы.tmp с одинаковым размером и одними и теми же данными много раз. Я попытался увеличить память Оператора записи, увеличил количество разделов Writer и т. Д. Тем не менее эта проблема продолжает происходить.

Я попытался добавить / удалить requestFinalize для MyWriter. Все та же проблема.

 @Override
    public void endWindow()
    {
        if (null != fileName) {
            requestFinalize(fileName);
        }
        super.endWindow();
    }

Это подмножество моих properties.xml

<property>
    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
    <value>1000</value>
  </property>

  <property>
    <name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
    <value>60</value>
  </property>

  <property>
    <name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
     <value>60</value>
  </property>

 <property>
        <name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
        <value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
    </property>

  <property>
    <name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
    <value>1000000000</value> <!-- 1 GB File -->
  </property>

Это трассировка стека, которую я смог получить из dt.log для оператора: оператор перераспределяется, вероятно, в разных константах, выдает это исключение и продолжает записывать дубликаты файлов.

 java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
        at com.datatorrent.stram.engine.Node.setup(Node.java:187)
        at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
        at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
        at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
        ... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.

1 ответ

Код для базового оператора находится по следующей ссылке и упоминается в комментариях ниже: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java

Устанавливая максимальный размер файла в 1 ГБ, вы автоматически включаете переходящие файлы; соответствующие поля:

protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;

Последнее установлено в true в setup() метод, если первый имеет значение меньше значения по умолчанию Long.MAX_VALUE,

При включении прокрутки файлов финализация файлов выполняется автоматически, поэтому не следует вызывать requestFinalize(),

Во-вторых, в вашем MyWriter класс, удалите endWindow() переопределите и убедитесь, что вы создали желаемое имя файла, которое включает идентификатор оператора в setup() метод и вернуть это имя файла в getFileName() переопределить; это гарантирует, что несколько разделителей не наступают друг на друга. Например:

@NotNull
private String fileName;           // current base file name

private transient String fName;    // per partition file name

@Override
public void setup(Context.OperatorContext context)
{
  // create file name for this partition by appending the operator id to
  // the base name
  //
  long id = context.getId();
  fName = fileName + "_p" + id;
  super.setup(context);

  LOG.debug("Leaving setup, fName = {}, id = {}", fName, id);
}

@Override
protected String getFileName(Long[] tuple)
{
  return fName;
}

Базовое имя файла (fileName в приведенном выше коде) может быть установлен непосредственно в коде или инициализирован из свойства в XML-файле (вам необходимо добавить для него также метод получения и установки).

Вы можете увидеть пример использования этого типа по адресу: https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput

Пара дополнительных предложений:

  1. Установите количество разделов в 1 (или закомментируйте XML, который устанавливает PARTITIONER атрибут) и убедитесь, что все работает как положено. Это устранит любые проблемы, не связанные с разделением. Если возможно, также уменьшите максимальный размер файла, скажем, до 2 КБ или 4 КБ, чтобы тестирование было проще.
  2. Как только работает один раздел, увеличьте количество разделов до 2. Если это работает, произвольные большие числа (в пределах разумного) также должны работать.
Другие вопросы по тегам