Apache Beam обновляет значения текущей строки на основе значений из предыдущей строки

Значения обновления Apache Beam основаны на значениях из предыдущей строки

Я сгруппировал значения из файла CSV. Здесь, в сгруппированных строках, мы находим несколько пропущенных значений, которые необходимо обновить на основе значений из предыдущей строки. Если первый столбец строки пуст, нам нужно обновить его на 0.

Я могу сгруппировать записи, но не могу понять логику обновления значений. Как мне этого добиться?

Записи

Код до сих пор:

      public class GroupByTest {
    public static void main(String[] args) throws IOException {
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");

        File csvFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) {
            type = logicalType.getName();
        }

        switch (type) {
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        }
    }



}

1 ответ

Решение

Beam не дает никаких гарантий порядка, поэтому вам придется сгруппировать их, как вы это делали.

Но насколько я понимаю из вашего случая, вам нужно сгруппировать по customerId. После этого вы можете применить PTransform, например ParDo, для сортировки сгруппированных строк по date и заполните недостающие значения, как хотите.

Другие вопросы по тегам