Чтение нескольких файлов.gz и определение того, какая строка принадлежит какому файлу

Я читаю несколько файлов.gz для обработки с использованием потока данных Google. Конечный пункт назначения данных - BigQuery. Таблица BigQuery имеет выделенные столбцы для каждого столбца в CSV-файле в файле.gz. В таблице BQ есть еще один столбец с именем file_name, в котором указано имя файла, к которому принадлежит эта запись. Я читаю файлы с использованием TextIO.Read и выполняю преобразование ParDo на нем. В DoFn есть способ идентифицировать имя файла, которому принадлежит входящая строка.

Мой код выглядит следующим образом:

PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
                .from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));

PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}

Обновление 1:

Я сейчас пытаюсь, как показано ниже:

        PCollection<String> fileNamesCollection // this is collection of file names
        GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;

                @Override
                public void processElement(ProcessContext c) throws Exception {
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }
                }
        }));

Но когда я запускаю эту программу, получаю, что channelFactory не сериализуем, там есть какая-либо фабрика каналов, которая реализует интерфейс Serializable и может использоваться здесь.

Обновление 2: я наконец смог выполнить программу и успешно отправить задание. Спасибо JKFF за помощь. Ниже мой окончательный код, я вставляю его сюда, чтобы он помог другим.

        ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
        DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
        loggingOptions.setDefaultWorkerLogLevel(Level.WARN);

        String jobName = "unique_job_name";
        options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);

        Pipeline pipeline = Pipeline.create(options);

        List<String> filesToProcess = new ArrayList<String>();
        for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
            filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
        }
        // at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
        PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));

        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    // I have to create _options here because Options, GcsIOChannelFactory are non serializable
                    ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
                    GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }

                    br.close();
                    gzip.close();
                    readChannel.close();
                }
        }));

        // Performing reshuffling here as suggested
        PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());

        PCollection<TableRow> formattedResults = withFileName
                .apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void processElement(ProcessContext c) throws Exception {
                    KV<String,String> kv = c.element();
                    String logLine = kv.getValue();
                    String logFileName = kv.getKey();

                    // do further processing as you want here
        }));

        // Finally insert in BQ table the formattedResults

1 ответ

Решение

Прямо сейчас ответ - нет. Если вам нужен доступ к именам файлов, к сожалению, лучше всего в этом случае внедрить расширение filepattern и синтаксический анализ файла (как ParDo). Вот несколько вещей, которые вы должны иметь в виду:

В качестве альтернативы вы можете написать свой собственный файловый источник. Тем не менее, в этом конкретном случае (файлы.gz) я бы рекомендовал против этого, потому что этот API в первую очередь предназначен для файлов, которые могут быть прочитаны с произвольным доступом с любого смещения.

Другие вопросы по тегам