Как мне написать несколько файлов в Apache Beam?
Позвольте мне упростить мой случай. Я использую Apache Beam 0.6.0. Мой окончательный обработанный результат PCollection<KV<String, String>>
, И я хочу записать значения в разные файлы, соответствующие их ключам.
Например, скажем, результат состоит из
(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)
Тогда я хочу написать value1
, value3
а также value4
в key1.txt
, и писать value4
в key2.txt
,
И в моем случае:
- Набор ключей определяется при работе конвейера, а не при его создании.
- Набор ключей может быть довольно маленьким, но количество значений, соответствующих каждому ключу, может быть очень очень большим.
Есть идеи?
4 ответа
Удобно, я написал образец этого дела на днях.
Этот пример - стиль потока данных 1.x
В основном вы группируете по каждому ключу, а затем вы можете сделать это с помощью специального преобразования, которое подключается к облачному хранилищу. Примите во внимание, что ваш список строк на файл не должен быть массивным (он должен помещаться в память в одном экземпляре, но, учитывая, что вы можете запускать экземпляры с большим количеством записей, этот предел довольно высок).
...
PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
.apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
readyToWrite.apply(
new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
...
И тогда преобразование, выполняющее большую часть работы:
public class PTransformWriteToGCS
extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {
private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);
private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();
private final String bucketName;
private final SerializableFunction<String, String> pathCreator;
public PTransformWriteToGCS(final String bucketName,
final SerializableFunction<String, String> pathCreator) {
this.bucketName = bucketName;
this.pathCreator = pathCreator;
}
@Override
public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {
return input
.apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {
@Override
public void processElement(
final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
throws Exception {
final String key = arg0.element().getKey();
final List<String> values = arg0.element().getValue();
final String toWrite = values.stream().collect(Collectors.joining("\n"));
final String path = pathCreator.apply(key);
BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
.setContentType(MimeTypes.TEXT)
.build();
LOG.info("blob writing to: {}", blobInfo);
Blob result = STORAGE.create(blobInfo,
toWrite.getBytes(StandardCharsets.UTF_8));
}
}));
}
}
Просто напишите цикл в функции ParDo! Более подробная информация - у меня был тот же сценарий сегодня, единственное, в моем случае это key = image_label и value = image_tf_record. Так что, как вы и просили, я пытаюсь создать отдельные файлы TFRecord, по одному на класс, каждый файл записи содержит количество изображений. ОДНАКО не уверен, что могут быть проблемы с памятью, когда количество значений на ключ очень велико, как в вашем сценарии: (также мой код на Python)
class WriteToSeparateTFRecordFiles(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
l, image_list = element
writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
for example in image_list:
writer.write(example.SerializeToString())
writer.close()
А затем в вашем конвейере сразу после этапа, на котором вы получаете пары ключ-значение, добавьте эти две строки:
| 'GroupByLabelId' >> beam.GroupByKey()
| 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))
Вы можете использовать FileIO.writeDinamic() для этого
PCollection<KV<String,String>> readfile= (something you read..);
readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to("somefolder")
.withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));
p.run();
В Java SDK Apache Beam 2.2 это изначально поддерживается в TextIO
а также AvroIO
используя соответственно TextIO
а также AvroIO.write().to(DynamicDestinations)
, Смотрите, например, этот метод.
Обновление (2018): Предпочитают использовать FileIO.writeDynamic()
вместе с TextIO.sink()
а также AvroIO.sink()
вместо.
Просто напишите ниже строки в вашем классе ParDo:
from apache_beam.io import filesystems eventCSVFileWriter = filesystems.FileSystems.create(gcsFileName) for record in list(Records): eventCSVFileWriter.write(record)
Если вам нужен полный код, я тоже могу вам в этом помочь.