Как мне написать несколько файлов в Apache Beam?

Позвольте мне упростить мой случай. Я использую Apache Beam 0.6.0. Мой окончательный обработанный результат PCollection<KV<String, String>>, И я хочу записать значения в разные файлы, соответствующие их ключам.

Например, скажем, результат состоит из

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

Тогда я хочу написать value1, value3 а также value4 в key1.txt, и писать value4 в key2.txt,

И в моем случае:

  • Набор ключей определяется при работе конвейера, а не при его создании.
  • Набор ключей может быть довольно маленьким, но количество значений, соответствующих каждому ключу, может быть очень очень большим.

Есть идеи?

4 ответа

Удобно, я написал образец этого дела на днях.

Этот пример - стиль потока данных 1.x

В основном вы группируете по каждому ключу, а затем вы можете сделать это с помощью специального преобразования, которое подключается к облачному хранилищу. Примите во внимание, что ваш список строк на файл не должен быть массивным (он должен помещаться в память в одном экземпляре, но, учитывая, что вы можете запускать экземпляры с большим количеством записей, этот предел довольно высок).

...
PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
            .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
    readyToWrite.apply(
            new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
...

И тогда преобразование, выполняющее большую часть работы:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

private final String bucketName;

private final SerializableFunction<String, String> pathCreator;

public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) {
    this.bucketName = bucketName;
    this.pathCreator = pathCreator;

}

@Override
public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

    return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

                @Override
                public void processElement(
                        final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                        throws Exception {
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                            .setContentType(MimeTypes.TEXT)
                            .build();
                    LOG.info("blob writing to: {}", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                            toWrite.getBytes(StandardCharsets.UTF_8));
                }
            }));
}

}

Просто напишите цикл в функции ParDo! Более подробная информация - у меня был тот же сценарий сегодня, единственное, в моем случае это key = image_label и value = image_tf_record. Так что, как вы и просили, я пытаюсь создать отдельные файлы TFRecord, по одному на класс, каждый файл записи содержит количество изображений. ОДНАКО не уверен, что могут быть проблемы с памятью, когда количество значений на ключ очень велико, как в вашем сценарии: (также мой код на Python)

class WriteToSeparateTFRecordFiles(beam.DoFn):

def __init__(self, outdir):
    self.outdir = outdir

def process(self, element):
    l, image_list = element
    writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
    for example in image_list:
        writer.write(example.SerializeToString())
    writer.close()

А затем в вашем конвейере сразу после этапа, на котором вы получаете пары ключ-значение, добавьте эти две строки:

    | 'GroupByLabelId' >> beam.GroupByKey()
    | 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))

Вы можете использовать FileIO.writeDinamic() для этого

PCollection<KV<String,String>> readfile= (something you read..);

readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("somefolder")
    .withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));

p.run();

В Java SDK Apache Beam 2.2 это изначально поддерживается в TextIO а также AvroIO используя соответственно TextIO а также AvroIO.write().to(DynamicDestinations), Смотрите, например, этот метод.

Обновление (2018): Предпочитают использовать FileIO.writeDynamic() вместе с TextIO.sink() а также AvroIO.sink() вместо.

Просто напишите ниже строки в вашем классе ParDo:

from apache_beam.io import filesystems

eventCSVFileWriter = filesystems.FileSystems.create(gcsFileName)
for record in list(Records):
    eventCSVFileWriter.write(record)

Если вам нужен полный код, я тоже могу вам в этом помочь.

Другие вопросы по тегам