Как добавить дополнительное поле к лучу FileIO.matchAll() результат?
У меня есть PCollection KV, где ключом является gcs file_patterns, а значением является некоторая дополнительная информация о файлах (например, системы "Source", которые генерировали файлы). Например,
KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")
Мне нужен PTransferm, чтобы развернуть file_patterns для всех подходящих файлов в папках GCS и сохранить поле "Source". Например, если есть два файла X1.dat, X2.dat в dir1 и один файл (Y1.dat) в dir2, вывод будет:
KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")
Могу ли я использовать FileIO.matchAll() для достижения этой цели? Я застрял на том, как объединить / присоединить поле "Источник" к соответствующим файлам. Это то, что я пытался, еще не совсем там:
public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
return filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(
new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?
Моя трудность заключается в последней строке, для XXXXX, как я могу получить поле значения ("Источник") из входного KV? Любой способ "соединить" или "объединить" значение входного KV обратно с "затраченными" ключами, так как один ключ (file_pattern) расширяется до нескольких значений.
Спасибо!
1 ответ
MatchResult.Medata
содержит resourceId
вы уже используете, но не соответствует пути GCS (с подстановочными знаками), которому он соответствует.
Вы можете достичь того, что вы хотите, используя боковые входы. Чтобы продемонстрировать это, я создал следующее filesAndSources
(согласно вашему комментарию это может быть входной параметр, поэтому он не может быть жестко задан в нисходящем направлении):
PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
KV.of("gs://" + Bucket + "/events/*", "Events")));
Я материализую это в сторону ввода (в данном случае как Map
). Ключ будет шаблоном glob, преобразованным в регулярное выражение (благодаря этому ответу), а значением будет исходная строка:
final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String regex = c.element().getKey();
StringBuilder out = new StringBuilder("^");
for(int i = 0; i < regex.length(); ++i) {
final char ch = regex.charAt(i);
switch(ch) {
case '*': out.append(".*"); break;
case '?': out.append('.'); break;
case '.': out.append("\\."); break;
case '\\': out.append("\\\\"); break;
default: out.append(ch);
}
}
out.append('$');
c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());
Затем, после чтения имен файлов, мы можем использовать боковой ввод для анализа каждого пути, чтобы увидеть, какая пара соответствует шаблону / источнику:
filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();
for (Map.Entry< String,String> pattern:patternSet)
{
if (fileName.matches(pattern.getKey())) {
String source = pattern.getValue();
c.output(KV.of(fileName, source));
}
}
}}).withSideInputs(regexAndSources))
Обратите внимание, что преобразование регулярных выражений выполняется, когда перед тем, как материализовать боковой ввод вместо здесь, чтобы избежать дублирования работы.
Вывод, как и ожидалось в моем случае:
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events