Путаница в реализации KStream / Table в Spring Boot

Я пытаюсь получить образец работающего "действия" в Spring-Boot kafka stream, и я, кажется, в итоге запутался:)

Я получаю данные JSON по проводам. Я построил схему в avro, которую я использую для сериализации данных:


{
  "UID": "XJ3_112",
  "type": "11X",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 0,
  "latitude": 50.1232,
  "longitude": -119.257,
  "altitude": 0,
  "time": "2018-07-18T00:00:13.9966Z"
}

{
  "platformUID": "BSG_SS_1_4",
  "type": "OB_334_11",
  "state": "ON_STATION",
  "fuelremaining": -1,
  "latitude": 56.1623,
  "longitude": -44.5614,
  "altitude": 519174,
  "time": "2018-07-18T00:01:43.0871Z"
}

Это насколько я получил:

@Component
class KStreamTransformer {

    @Autowired
    private lateinit var objectMapper: ObjectMapper

    @StreamListener(MyKafkaStreams.INPUT)
    @SendTo(MyKafkaStreams.OUTPUT)
    fun process(input: KStream<*, TestEntity>) : KStream<*, TestEntity> {

        return input.flatMapValues{
            value ->
            val out = Arrays.asList(value)

            out
        }.groupBy() ???
    }
}

Я надеюсь создать KTable, который выглядит следующим образом:

| PlatformUID| состояние |Lat|Lon|Alt| |-----------|-----|---|---|---|

И вот тут я запутался.

Я предполагаю, что хочу сделать GroupBy на PlatformUID поле, но мне неясно, как на самом деле двигаться вперед.

Может ли кто-нибудь указать мне правильное направление?

Я думаю, что я ищу, чтобы взять input поток и превратить его в KTable с ключом value.getUID() и значение является значением, которое было раньше

1 ответ

Решение

Если platformUID уже является ключом, который использует производитель данных, вы можете использовать

KTable ktable = kstream
                 .groupByKey()
                 .reduce((oldValue, newValue) -> newValue)

Если нет, KeyValueMapper должен быть помещен в .groupBy()и это выглядит как

KTable ktable = kstream
                 .groupBy((k, v) -> v.getPlatformUID())
                 .reduce((oldValue, newValue) -> newValue)

Это создаст внутреннюю тему, которая переделит исходную тему с новым ключом.

Для Java 7 синтаксис KeyValueMapper выглядит следующим образом:

KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
        public K apply(K key, V1 value) {
            return key;
        }
    };
Другие вопросы по тегам