Как посчитать количество строк во входном файле обработки файла Google Dataflow?

Я пытаюсь подсчитать количество строк во входном файле и использую Cloud dataflow Runner для создания шаблона. В приведенном ниже коде я читаю файл из корзины GCS, обрабатываю его, а затем сохраняю вывод в экземпляре Redis.

Но я не могу подсчитать количество строк во входном файле.

Основной класс

 public static void main(String[] args) {
    /**
     * Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
     */
    StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StorageToRedisOptions.class);

    Pipeline p = Pipeline.create(options);
    p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
            .apply("Transforming data...",
                    ParDo.of(new DoFn<String, String[]>() {
                        @ProcessElement
                        public void TransformData(@Element String line, OutputReceiver<String[]> out) {
                            String[] fields = line.split("\\|");
                            out.output(fields);
                        }
                    }))
            .apply("Processing data...",
                    ParDo.of(new DoFn<String[], KV<String, String>>() {
                        @ProcessElement
                        public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
                            if (fields[RedisIndex.GUID.getValue()] != null) {

                                out.output(KV.of("firstname:"
                                        .concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("lastname:"
                                        .concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("dob:"
                                        .concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));

                                out.output(KV.of("postalcode:"
                                        .concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));

                            }
                        }
                    }))
            .apply("Writing field indexes into redis",
            RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
                    .withEndpoint(options.getRedisHost(), options.getRedisPort()));
    p.run();

}

Пример входного файла

xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666

Команда для выполнения конвейера

mvn compile exec:java \
  -Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
  -Dexec.args="--project=my-project-id \
  --jobName=dataflow-job \
  --inputFile=gs://my-input-bucket/*.txt \
  --redisHost=127.0.0.1 \
  --stagingLocation=gs://pipeline-bucket/stage/ \
  --dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template \
  --runner=DataflowRunner"

Я попытался использовать приведенный ниже код из решения Stackru, но мне это не помогло.

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);

Я также просмотрел документацию Apache Beam, но не нашел ничего полезного. Любая помощь по этому поводу будет очень признательна.

2 ответа

Решение

Я решил эту проблему, добавив Count.globally() и обращаясь к PCollection<String> после того, как конвейер прочитает файл.

Я добавил следующий код:

PCollection<String> lines = p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()));

 lines.apply(Count.globally()).apply("Count the total records", ParDo.of(new RecordCount()));

где я создал новый класс (RecordCount.java), который расширяет DoFn, который просто регистрирует счетчик.

RecordCount.java

import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordCount extends DoFn<Long, Void> {

    private static final Logger LOGGER = LoggerFactory.getLogger(RecordCount.class);

    @ProcessElement
    public void processElement(@Element Long count) {
       LOGGER.info("The total number of records in the input file is: ", count);

        }
    }

}

Правильный способ сделать это - записать счетчик в систему хранения, используя соединитель Beam (или используя Beam ParDo). Результат конвейера не доступен напрямую для основной программы, так как Beam runner может распараллеливать вычисления, и выполнение может происходить не на одном компьютере.

Например (псевдокод):

    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally())
     .apply(ParDo(MyLongToStringParDo()))
     .apply(TextIO.Write.to("gs://..."));

Если вам нужно обрабатывать вывод непосредственно в основной программе, вы можете читать из GCS с помощью клиентской библиотеки после завершения программы Beam (обязательно укажите p.run().waitUntilFinish()в этом случае). В качестве альтернативы вы можете переместить свое вычисление (которое требует подсчета) в Луч PTransform и сделайте это частью своего конвейера.