Как лучше всего объединить два события из одной темы с помощью Kafka Streams Api?
Я новичок в потоках кафки, и у меня есть следующий сценарий. Существует тема, содержащая записи типа Event, некоторые из них являются дополнительной информацией о фактическом событии (что-то вроде обновления). Эти две записи генерируются почти одновременно из источника и имеют один и тот же ключ. Мне нужно объединить эти два события в новое обновленное событие, а затем записать его в новую тему.
До сих пор я сделал что-то вроде следующего. Можно ли объединить два потока kstream, созданных с помощью .split() в одной и той же топологии? Это верный подход?
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> eventsStream = builder.stream("events-topic");
Predicate<String, Event> isEventType = (key, value) -> {
return value.getType().equals("event");
};
Predicate<String, Event> isUpdateType = (key, value) -> {
return value.getType().equals("update");
};
ValueJoiner<Event, Event, Event> joiner = (event, update) -> {
event.setText(event.getText() + update.getAdditionalText());
return event;
};
//split events into sub streams
Map<String, KStream<String, Event>> subStreams = eventsStream
.split(Named.as("type_"))
.branch(isEventType, Branched.as("events"))
.branch(isUpdateType, Branched.as("updates"))
.defaultBranch(Branched.as("other"));
//join sub stream of events with sub stream of updates
subStreams.get("type_events").join(
subStreams.get("type_updates"),
joiner,
JoinWindows.of(Duration.ofMillis(10))
).to("out-topic");
1 ответ
В настоящее время Kafka Streams имеет ограничение, заключающееся в том, что он не поддерживает самосоединение. См. https://issues.apache.org/jira/browse/KAFKA-7497 .
Учитывая это ограничение, разделение темы на две части и последующее их объединение — подходящее решение!