Java Apache Beam: как добавить новые строки в коллекцию PCollection
В следующем CSV мне нужно добавить для него новые значения строк.
Код Java:
public static void main(String[] args) throws IOException {
final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");
File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
RowAddition rowAddition = new RowAddition();
final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
;
// Как совместить эти два объекта PCollection?
PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
}
Логика добавления строк
class RowAddition extends DoFn<Row, Row> {
private static final long serialVersionUID = -8093837716944809689L;
@ProcessElement
public void processElement(ProcessContext context) {
org.apache.beam.sdk.schemas.Schema beamSchema=null;
try {
beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Row row = context.element();
Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
context.output(newRow);
}
}
Я ссылаюсь на эту ссылку
1 ответ
Вы ищете преобразование Flatten .Это берет любое количество существующих коллекций PCollection и создает новую коллекцию PCollection с объединением их элементов. Для совершенно новых элементов вы можете использовать Create или другой PTransform для вычисления новых элементов на основе старых.