Как получить имя файла при использовании сопоставления с шаблоном файла в google-cloud-dataflow

Кто-то знает, как получить имя файла при использовании сопоставления шаблонов файлов в google-cloud-dataflow?

Я новичок в использовании потока данных. Как получить имя файла при использовании совпадения файлов, таким образом.

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*.txt"))

Я хотел бы узнать, как определить имя файла: kinglear.txt,Hamlet.txt и т. Д.

6 ответов

Если вы хотите просто развернуть файл шаблона и получить список имен файлов, соответствующих ему, вы можете использовать GcsIoChannelFactory.match("gs://dataflow-samples/shakespeare/*.txt") (см. GcsIoChannelFactory).

Если вы хотите получить доступ к "текущему имени файла" изнутри одного из нижестоящих потоков DoFn в вашем конвейере - это в настоящее время не поддерживается (хотя есть некоторые обходные пути - см. Ниже). Это общий запрос, и мы все еще думаем о том, как лучше всего вписать его в структуру естественным, универсальным и высокопроизводительным способом.

Некоторые обходные пути включают в себя:

  • Пишем конвейер следующим образом (пример tf-idf использует этот подход):
    DoFn readFile =... (принимает имя файла, читает файл и создает записи)...
    p.apply(Create.of(имена файлов))
     .Нанесите (ParDo.of(ReadFile))
     .apply(остальная часть вашего конвейера)

Это имеет недостаток, заключающийся в том, что функции динамической перебалансировки работы не будут работать особенно хорошо, потому что в настоящее время они применяются только на уровне Read PTransform, но не на уровне ParDo с большим разветвлением (как здесь, который будет читать подать и произвести все записи); и распараллеливание будет работать только на уровне файлов, но файлы не будут разбиты на поддиапазоны. При масштабах чтения Шекспира это не проблема, но если вы читаете набор файлов совершенно разного размера, в том числе очень больших, то это может стать проблемой.

  • Реализация своего FileBasedSource ( Javadoc, общая документация), которая будет возвращать записи типа что-то вроде Pair<String, T> где String это имя файла и T это запись, которую вы читаете. В этом случае фреймворк будет обрабатывать сопоставление файлового шаблона для вас, динамическая перебалансировка работы будет работать просто отлично, однако вы должны написать логику чтения в своем FileBasedReader,

Оба этих обходных пути не идеальны, но в зависимости от ваших требований, один из них может помочь вам.

Обновление на основе последней версии SDK Java (SDK 2.9.0):

Считыватели TextIO Beams не предоставляют доступ к самому имени файла, для этих случаев использования нам необходимо использовать FileIO для сопоставления файлов и получения доступа к информации, хранящейся в имени файла. В отличие от TextIO, чтение файла должно осуществляться пользователем в преобразованиях после чтения FileIO. Результатом чтения FileIO является PCollection. Класс ReadableFile содержит имя файла в качестве метаданных, которое может использоваться вместе с содержимым файла.

FileIO имеет удобный метод readFullyAsUTF8String(), который будет считывать весь файл в объект String, который сначала будет считывать весь файл в память. Если проблема связана с памятью, вы можете напрямую работать с файлом с помощью служебных классов, таких как FileSystems.

От: Ссылка на документ

PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python (SDK 2.9.0):

Для версии 2.9.0 для python вам необходимо собрать список URI вне конвейера потока данных и передать его в качестве параметра в конвейер. Например, использование FileSystems для чтения в списке файлов с помощью шаблона Glob, а затем передача его в PCollection для обработки.

У меня также было 100 входных файлов = 100 узлов на диаграмме потока данных при использовании кода, похожего на @danvk. Я переключился на такой подход, который привел к тому, что все чтения были объединены в один блок, который вы можете развернуть, чтобы развернуть каждый файл / каталог, который был прочитан. Работа также выполнялась быстрее с использованием этого подхода, а не подхода Lists.transform в нашем случае использования.

GcsOptions gcsOptions = options.as(GcsOptions.class);
List<GcsPath> paths = gcsOptions.getGcsUtil().expand(GcsPath.fromUri(options.getInputFile()));
List<String>filesToProcess = paths.stream().map(item -> item.toString()).collect(Collectors.toList());

PCollectionList<SomeClass> pcl = PCollectionList.empty(p);
for(String fileName : filesToProcess) {
    pcl = pcl.and(
            p.apply("ReadAvroFile" + fileName, AvroIO.Read.named("ReadFromAvro")
                    .from(fileName)
                    .withSchema(SomeClass.class)
            )
            .apply(ParDo.of(new MyDoFn(fileName)))
    );
}

// flatten the PCollectionList, combining all the PCollections together
PCollection<SomeClass> flattenedPCollection = pcl.apply(Flatten.pCollections());

Одним из подходов является создание List<PCollection> где каждая запись соответствует входному файлу, затем используйте Flatten, Например, если вы хотите проанализировать каждую строку коллекции файлов в Foo объект, вы можете сделать что-то вроде этого:

public static class FooParserFn extends DoFn<String, Foo> {
  private String fileName;
  public FooParserFn(String fileName) {
    this.fileName = fileName;
  }

  @Override
  public void processElement(ProcessContext processContext) throws Exception {
    String line = processContext.element();
    // here you have access to both the line of text and the name of the file
    // from which it came.
  }
}

public static void main(String[] args) {
  ...
  List<String> inputFiles = ...;
  List<PCollection<Foo>> foosByFile =
          Lists.transform(inputFiles,
          new Function<String, PCollection<Foo>>() {
            @Override
            public PCollection<Foo> apply(String fileName) {
              return p.apply(TextIO.Read.from(fileName))
                      .apply(new ParDo().of(new FooParserFn(fileName)));
            }
          });

  PCollection<Foo> foos = PCollectionList.<Foo>empty(p).and(foosByFile).apply(Flatten.<Foo>pCollections());
  ...
}

Недостатком этого подхода является то, что если у вас есть 100 входных файлов, у вас также будет 100 узлов в консоли мониторинга Cloud Dataflow. Это затрудняет понимание того, что происходит. Мне было бы интересно услышать от людей из Google Cloud Dataflow, эффективен ли этот подход.

Я боролся с тем же вариантом использования при использовании подстановочного знака для чтения файлов из GCS, но мне также нужно было изменить коллекцию на основе имени файла. Ключ в том, чтобы использовать ReadFromTextWithFilename вместо readfromtext. В java у вас уже есть выход, и вы можете использовать : Строковое имя файла =context.element().getMetadata().resourceId().getCurrentDirectory().toString() внутри вашего метода processElement.

Но для Python будет работать следующая техника:-> Используйте beam.io.ReadFromTextWithFilename для чтения подстановочного пути из GCS-> Согласно документу, ReadFromTextWithFilename возвращает имя файла и содержимое файла.

Ниже приведен фрагмент кода:

      class GetFileNameFromWildcard(beam.DoFn):
def process(self, element, *args, **kwargs):
    file_path, content = element
    schema = ["id","name","mob","email","dept","store"]
    store_name = file_path.split("/")[-2]
    content_list = content.split(",")
    content_list.append(store_name)
    out_dict = dict(zip(schema,content_list))
    print(out_dict)
    yield out_dict


def run():
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
    # saving main session so that it can load global namespace on the Cloud Dataflow Worker
    init = p | 'Begin Pipeline With Initiator' >> beam.Create(
        ["pcollection initializer"]) | 'Read From GCS' >> beam.io.ReadFromTextWithFilename(
        "gs://<bkt-name>/20220826/*/dlp*", skip_header_lines=1) | beam.ParDo(
        GetFileNameFromWildcard()) | beam.io.WriteToText(
        'df_out.csv')
    

Это может быть очень поздняя публикация по вышеуказанному вопросу, но я хотел добавить ответ с помощью классов, связанных с Beam.

Это также можно рассматривать как извлеченный код из решения, предоставленного @Reza Rokni.

PCollection<String> listOfFilenames =
    pipe.apply(FileIO.match().filepattern("gs://apache-beam-samples/shakespeare/*"))
        .apply(FileIO.readMatches())
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (FileIO.ReadableFile file) -> {
                      String f = file.getMetadata().resourceId().getFilename();
                      System.out.println(f);
                      return f;
                    }));

pipe.run().waitUntilFinish();

Над PCollection<String> будет иметь список файлов, доступных в любом указанном каталоге.

Другие вопросы по тегам