Как посчитать количество строк во входном файле обработки файла Google Dataflow?
Я пытаюсь подсчитать количество строк во входном файле и использую Cloud dataflow Runner для создания шаблона. В приведенном ниже коде я читаю файл из корзины GCS, обрабатываю его, а затем сохраняю вывод в экземпляре Redis.
Но я не могу подсчитать количество строк во входном файле.
Основной класс
public static void main(String[] args) {
/**
* Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line
*/
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StorageToRedisOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()))
.apply("Transforming data...",
ParDo.of(new DoFn<String, String[]>() {
@ProcessElement
public void TransformData(@Element String line, OutputReceiver<String[]> out) {
String[] fields = line.split("\\|");
out.output(fields);
}
}))
.apply("Processing data...",
ParDo.of(new DoFn<String[], KV<String, String>>() {
@ProcessElement
public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) {
if (fields[RedisIndex.GUID.getValue()] != null) {
out.output(KV.of("firstname:"
.concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("lastname:"
.concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("dob:"
.concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()]));
out.output(KV.of("postalcode:"
.concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()]));
}
}
}))
.apply("Writing field indexes into redis",
RedisIO.write().withMethod(RedisIO.Write.Method.SADD)
.withEndpoint(options.getRedisHost(), options.getRedisPort()));
p.run();
}
Пример входного файла
xxxxxxxxxxxxxxxx|bruce|wayne|31051989|444444444444
yyyyyyyyyyyyyyyy|selina|thomas|01051989|222222222222
aaaaaaaaaaaaaaaa|clark|kent|31051990|666666666666
Команда для выполнения конвейера
mvn compile exec:java \
-Dexec.mainClass=com.viveknaskar.DataFlowPipelineForMemStore \
-Dexec.args="--project=my-project-id \
--jobName=dataflow-job \
--inputFile=gs://my-input-bucket/*.txt \
--redisHost=127.0.0.1 \
--stagingLocation=gs://pipeline-bucket/stage/ \
--dataflowJobFile=gs://pipeline-bucket/templates/dataflow-template \
--runner=DataflowRunner"
Я попытался использовать приведенный ниже код из решения Stackru, но мне это не помогло.
PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
p.apply(TextIO.Read.from("gs://..."))
.apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);
Я также просмотрел документацию Apache Beam, но не нашел ничего полезного. Любая помощь по этому поводу будет очень признательна.
2 ответа
Я решил эту проблему, добавив
Count.globally()
и обращаясь к
PCollection<String>
после того, как конвейер прочитает файл.
Я добавил следующий код:
PCollection<String> lines = p.apply("Reading Lines...", TextIO.read().from(options.getInputFile()));
lines.apply(Count.globally()).apply("Count the total records", ParDo.of(new RecordCount()));
где я создал новый класс (RecordCount.java), который расширяет DoFn
RecordCount.java
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RecordCount extends DoFn<Long, Void> {
private static final Logger LOGGER = LoggerFactory.getLogger(RecordCount.class);
@ProcessElement
public void processElement(@Element Long count) {
LOGGER.info("The total number of records in the input file is: ", count);
}
}
}
Правильный способ сделать это - записать счетчик в систему хранения, используя соединитель Beam (или используя Beam ParDo). Результат конвейера не доступен напрямую для основной программы, так как Beam runner может распараллеливать вычисления, и выполнение может происходить не на одном компьютере.
Например (псевдокод):
p.apply(TextIO.Read.from("gs://..."))
.apply(Count.<String>globally())
.apply(ParDo(MyLongToStringParDo()))
.apply(TextIO.Write.to("gs://..."));
Если вам нужно обрабатывать вывод непосредственно в основной программе, вы можете читать из GCS с помощью клиентской библиотеки после завершения программы Beam (обязательно укажите
p.run().waitUntilFinish()
в этом случае). В качестве альтернативы вы можете переместить свое вычисление (которое требует подсчета) в Луч
PTransform
и сделайте это частью своего конвейера.