Как получить имя файла при использовании сопоставления с шаблоном файла в google-cloud-dataflow
Кто-то знает, как получить имя файла при использовании сопоставления шаблонов файлов в google-cloud-dataflow?
Я новичок в использовании потока данных. Как получить имя файла при использовании совпадения файлов, таким образом.
p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))
Я хотел бы узнать, как определить имя файла: kinglear.txt,Hamlet.txt и т. Д.
6 ответов
Если вы хотите просто развернуть файл шаблона и получить список имен файлов, соответствующих ему, вы можете использовать GcsIoChannelFactory.match("gs://dataflow-samples/shakespeare/*.txt")
(см. GcsIoChannelFactory).
Если вы хотите получить доступ к "текущему имени файла" изнутри одного из нижестоящих потоков DoFn в вашем конвейере - это в настоящее время не поддерживается (хотя есть некоторые обходные пути - см. Ниже). Это общий запрос, и мы все еще думаем о том, как лучше всего вписать его в структуру естественным, универсальным и высокопроизводительным способом.
Некоторые обходные пути включают в себя:
- Пишем конвейер следующим образом (пример tf-idf использует этот подход):
DoFn readFile =... (принимает имя файла, читает файл и создает записи)... p.apply(Create.of(имена файлов)) .Нанесите (ParDo.of(ReadFile)) .apply(остальная часть вашего конвейера)
Это имеет недостаток, заключающийся в том, что функции динамической перебалансировки работы не будут работать особенно хорошо, потому что в настоящее время они применяются только на уровне Read PTransform, но не на уровне ParDo с большим разветвлением (как здесь, который будет читать подать и произвести все записи); и распараллеливание будет работать только на уровне файлов, но файлы не будут разбиты на поддиапазоны. При масштабах чтения Шекспира это не проблема, но если вы читаете набор файлов совершенно разного размера, в том числе очень больших, то это может стать проблемой.
- Реализация своего
FileBasedSource
( Javadoc, общая документация), которая будет возвращать записи типа что-то вродеPair<String, T>
гдеString
это имя файла иT
это запись, которую вы читаете. В этом случае фреймворк будет обрабатывать сопоставление файлового шаблона для вас, динамическая перебалансировка работы будет работать просто отлично, однако вы должны написать логику чтения в своемFileBasedReader
,
Оба этих обходных пути не идеальны, но в зависимости от ваших требований, один из них может помочь вам.
Обновление на основе последней версии SDK Java (SDK 2.9.0):
Считыватели TextIO Beams не предоставляют доступ к самому имени файла, для этих случаев использования нам необходимо использовать FileIO для сопоставления файлов и получения доступа к информации, хранящейся в имени файла. В отличие от TextIO, чтение файла должно осуществляться пользователем в преобразованиях после чтения FileIO. Результатом чтения FileIO является PCollection. Класс ReadableFile содержит имя файла в качестве метаданных, которое может использоваться вместе с содержимым файла.
FileIO имеет удобный метод readFullyAsUTF8String(), который будет считывать весь файл в объект String, который сначала будет считывать весь файл в память. Если проблема связана с памятью, вы можете напрямую работать с файлом с помощью служебных классов, таких как FileSystems.
PCollection<KV<String, String>> filesAndContents = p
.apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
// withCompression can be omitted - by default compression is detected from the filename.
.apply(FileIO.readMatches().withCompression(GZIP))
.apply(MapElements
// uses imports from TypeDescriptors
.into(KVs(strings(), strings()))
.via((ReadableFile f) -> KV.of(
f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));
Python (SDK 2.9.0):
Для версии 2.9.0 для python вам необходимо собрать список URI вне конвейера потока данных и передать его в качестве параметра в конвейер. Например, использование FileSystems для чтения в списке файлов с помощью шаблона Glob, а затем передача его в PCollection для обработки.
У меня также было 100 входных файлов = 100 узлов на диаграмме потока данных при использовании кода, похожего на @danvk. Я переключился на такой подход, который привел к тому, что все чтения были объединены в один блок, который вы можете развернуть, чтобы развернуть каждый файл / каталог, который был прочитан. Работа также выполнялась быстрее с использованием этого подхода, а не подхода Lists.transform в нашем случае использования.
GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());
PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess) {
pcl = pcl.and(
p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
.from(fileName)
.withSchema(SomeClass.class)
)
.apply(ParDo.of(new MyDoFn(fileName)))
);
}
// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());
Одним из подходов является создание List<PCollection>
где каждая запись соответствует входному файлу, затем используйте Flatten
, Например, если вы хотите проанализировать каждую строку коллекции файлов в Foo
объект, вы можете сделать что-то вроде этого:
public static class FooParserFn extends DoFn<String, Foo> {
private String fileName;
public FooParserFn(String fileName) {
this.fileName = fileName;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
String line = processContext.element();
// here you have access to both the line of text and the name of the file
// from which it came.
}
}
public static void main(String[] args) {
...
List<String> inputFiles = ...;
List<PCollection<Foo>> foosByFile =
Lists.transform(inputFiles,
new Function<String, PCollection<Foo>>() {
@Override
public PCollection<Foo> apply(String fileName) {
return p.apply(TextIO.Read.from(fileName))
.apply(new ParDo().of(new FooParserFn(fileName)));
}
});
PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
...
}
Недостатком этого подхода является то, что если у вас есть 100 входных файлов, у вас также будет 100 узлов в консоли мониторинга Cloud Dataflow. Это затрудняет понимание того, что происходит. Мне было бы интересно услышать от людей из Google Cloud Dataflow, эффективен ли этот подход.
Я боролся с тем же вариантом использования при использовании подстановочного знака для чтения файлов из GCS, но мне также нужно было изменить коллекцию на основе имени файла. Ключ в том, чтобы использовать ReadFromTextWithFilename вместо readfromtext. В java у вас уже есть выход, и вы можете использовать : Строковое имя файла =context.element().getMetadata().resourceId().getCurrentDirectory().toString() внутри вашего метода processElement.
Но для Python будет работать следующая техника:-> Используйте beam.io.ReadFromTextWithFilename для чтения подстановочного пути из GCS-> Согласно документу, ReadFromTextWithFilename возвращает имя файла и содержимое файла.
Ниже приведен фрагмент кода:
class GetFileNameFromWildcard(beam.DoFn):
def process(self, element, *args, **kwargs):
file_path, content = element
schema = ["id","name","mob","email","dept","store"]
store_name = file_path.split("/")[-2]
content_list = content.split(",")
content_list.append(store_name)
out_dict = dict(zip(schema,content_list))
print(out_dict)
yield out_dict
def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
# saving main session so that it can load global namespace on the Cloud Dataflow Worker
init = p | 'Begin Pipeline With Initiator' >> beam.Create(
["pcollection initializer"]) | 'Read From GCS' >> beam.io.ReadFromTextWithFilename(
"gs://<bkt-name>/20220826/*/dlp*", skip_header_lines=1) | beam.ParDo(
GetFileNameFromWildcard()) | beam.io.WriteToText(
'df_out.csv')
Это может быть очень поздняя публикация по вышеуказанному вопросу, но я хотел добавить ответ с помощью классов, связанных с Beam.
Это также можно рассматривать как извлеченный код из решения, предоставленного @Reza Rokni.
PCollection<String> listOfFilenames =
pipe.apply(FileIO.match().filepattern("gs://apache-beam-samples/shakespeare/*"))
.apply(FileIO.readMatches())
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(FileIO.ReadableFile file) -> {
String f = file.getMetadata().resourceId().getFilename();
System.out.println(f);
return f;
}));
pipe.run().waitUntilFinish();
Над PCollection<String>
будет иметь список файлов, доступных в любом указанном каталоге.