Apache Beam обновляет значения текущей строки на основе значений из предыдущей строки
Значения обновления Apache Beam основаны на значениях из предыдущей строки
Я сгруппировал значения из файла CSV. Здесь, в сгруппированных строках, мы находим несколько пропущенных значений, которые необходимо обновить на основе значений из предыдущей строки. Если первый столбец строки пуст, нам нужно обновить его на 0.
Я могу сгруппировать записи, но не могу понять логику обновления значений. Как мне этого добиться?
Записи
Код до сих пор:
public class GroupByTest {
public static void main(String[] args) throws IOException {
System.out.println("We are about to start!!");
final File schemaFile = new File(
"C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");
File csvFile = new File(
"C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
// Transformation
//Convert row to KV
final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
.aggregateField("balance", Sum.ofDoubles(), "balances");
final PCollection<Row> aggregagte = rows.apply(combine);
PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
if (logicalType != null) {
type = logicalType.getName();
}
switch (type) {
case "string":
return row.getString(columnName);
case "int":
return Objects.requireNonNull(row.getInt32(columnName)).toString();
case "bigint":
return Objects.requireNonNull(row.getInt64(columnName)).toString();
case "double":
return Objects.requireNonNull(row.getDouble(columnName)).toString();
case "timestamp-millis":
return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();
default:
return row.getString(columnName);
}
}
}
1 ответ
Beam не дает никаких гарантий порядка, поэтому вам придется сгруппировать их, как вы это делали.
Но насколько я понимаю из вашего случая, вам нужно сгруппировать по
customerId
. После этого вы можете применить PTransform, например ParDo, для сортировки сгруппированных строк по
date
и заполните недостающие значения, как хотите.