Java Apache Beam: как добавить новые строки в коллекцию PCollection

В следующем CSV мне нужно добавить для него новые значения строк.

Код Java:

          public static void main(String[] args) throws IOException {
        final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");

        File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");

        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows

        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
        RowAddition rowAddition = new RowAddition();
        final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
        ;

// Как совместить эти два объекта PCollection?

              PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
        pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));

        pipeline.run().waitUntilFinish();
        System.out.println("The end");
    }
}

Логика добавления строк

      class RowAddition extends DoFn<Row, Row> {

    private static final long serialVersionUID = -8093837716944809689L;
    

    @ProcessElement
    public void processElement(ProcessContext context) {
        org.apache.beam.sdk.schemas.Schema beamSchema=null;
        try {
            beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Row row = context.element();
        Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
        context.output(newRow);
    }

}

Я ссылаюсь на эту ссылку

https://beam.apache.org/documentation/pipelines/design-your-pipeline/#:~:text=Merging%20PCollections,-Often%2C%20after%20you&amp;amp;text=You%20can%20do%20so%20by,join % 20между% 202% 20PCollection% 20сек .

1 ответ

Вы ищете преобразование Flatten .Это берет любое количество существующих коллекций PCollection и создает новую коллекцию PCollection с объединением их элементов. Для совершенно новых элементов вы можете использовать Create или другой PTransform для вычисления новых элементов на основе старых.

Другие вопросы по тегам