Кстрим-Кстрим объединение на основе общего поля
Мы хотим выполнить соединение Kstream-Kstream на основе общего поля (первичного ключа). В настоящее время с приведенным ниже кодом мы получаем результат как просто слияние 2 потоков без каких-либо ограничений первичного ключа.
val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)
userRegions.join(regionMetrics)(
((regionValue, metricValue) => regionValue + "/" + metricValue),
JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)
Не могли бы вы подсказать, как присоединиться к 2 потокам на основе общего поля / столбца.
1 ответ
Чтобы объединить 2 потока на основе общего поля / столбца, вы можете использовать функцию selectKey(), я предоставлю вам фрагмент, который может вам помочь.
val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)
// New code
val userRegionsWithKeys = userRegions.selectKey(new ValueMapper (String key, String Value) {
// create your key here and return it
// Please the syntax for Scala
@override
void apply (String key, String value) {
return "key that you want"
}
});
val regionMetricsWithKeys = regionMetrics.selectKey(new ValueMapper (String key, String Value) {
// create your key here and return it
// Please the syntax for Scala
@override
void apply (String key, String value) {
return "key that you want"
}
});
userRegionsWithKeys .join(regionMetricsWithKeys )(
((regionValue, metricValue) => regionValue + "/" + metricValue),
JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)
Надеюсь, это решение вам поможет.
Спасибо