AbstractFileOutputWriter Генерация дубликатов файлов tmp
У меня есть приложение Apache Apex, которое использует Kafka Logs и записывает его в HDFS.
Группа обеспечения доступности баз данных достаточно проста, так что есть потребитель Kafka (20 разделов памяти по 2 ГБ для оператора), подключенный потоком к "MyWriter extends AbstractFileOutputOperator".
Проблема: 1. Я видел, как Writer неоднократно писал файлы.tmp с одинаковым размером и одними и теми же данными много раз. Я попытался увеличить память Оператора записи, увеличил количество разделов Writer и т. Д. Тем не менее эта проблема продолжает происходить.
Я попытался добавить / удалить requestFinalize для MyWriter. Все та же проблема.
@Override
public void endWindow()
{
if (null != fileName) {
requestFinalize(fileName);
}
super.endWindow();
}
Это подмножество моих properties.xml
<property>
<name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
<value>1000</value>
</property>
<property>
<name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
<value>60</value>
</property>
<property>
<name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
<value>60</value>
</property>
<property>
<name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
<value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
</property>
<property>
<name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
<value>1000000000</value> <!-- 1 GB File -->
</property>
Это трассировка стека, которую я смог получить из dt.log для оператора: оператор перераспределяется, вероятно, в разных константах, выдает это исключение и продолжает записывать дубликаты файлов.
java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
at com.datatorrent.stram.engine.Node.setup(Node.java:187)
at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
1 ответ
Код для базового оператора находится по следующей ссылке и упоминается в комментариях ниже: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java
Устанавливая максимальный размер файла в 1 ГБ, вы автоматически включаете переходящие файлы; соответствующие поля:
protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;
Последнее установлено в true в setup()
метод, если первый имеет значение меньше значения по умолчанию Long.MAX_VALUE
,
При включении прокрутки файлов финализация файлов выполняется автоматически, поэтому не следует вызывать requestFinalize()
,
Во-вторых, в вашем MyWriter
класс, удалите endWindow()
переопределите и убедитесь, что вы создали желаемое имя файла, которое включает идентификатор оператора в setup()
метод и вернуть это имя файла в getFileName()
переопределить; это гарантирует, что несколько разделителей не наступают друг на друга. Например:
@NotNull
private String fileName; // current base file name
private transient String fName; // per partition file name
@Override
public void setup(Context.OperatorContext context)
{
// create file name for this partition by appending the operator id to
// the base name
//
long id = context.getId();
fName = fileName + "_p" + id;
super.setup(context);
LOG.debug("Leaving setup, fName = {}, id = {}", fName, id);
}
@Override
protected String getFileName(Long[] tuple)
{
return fName;
}
Базовое имя файла (fileName
в приведенном выше коде) может быть установлен непосредственно в коде или инициализирован из свойства в XML-файле (вам необходимо добавить для него также метод получения и установки).
Вы можете увидеть пример использования этого типа по адресу: https://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput
Пара дополнительных предложений:
- Установите количество разделов в 1 (или закомментируйте XML, который устанавливает
PARTITIONER
атрибут) и убедитесь, что все работает как положено. Это устранит любые проблемы, не связанные с разделением. Если возможно, также уменьшите максимальный размер файла, скажем, до 2 КБ или 4 КБ, чтобы тестирование было проще. - Как только работает один раздел, увеличьте количество разделов до 2. Если это работает, произвольные большие числа (в пределах разумного) также должны работать.