Кстрим-Кстрим объединение на основе общего поля

Мы хотим выполнить соединение Kstream-Kstream на основе общего поля (первичного ключа). В настоящее время с приведенным ниже кодом мы получаем результат как просто слияние 2 потоков без каких-либо ограничений первичного ключа.

      val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)


userRegions.join(regionMetrics)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

Не могли бы вы подсказать, как присоединиться к 2 потокам на основе общего поля / столбца.

1 ответ

Чтобы объединить 2 потока на основе общего поля / столбца, вы можете использовать функцию selectKey(), я предоставлю вам фрагмент, который может вам помочь.

      val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)

// New code
val userRegionsWithKeys = userRegions.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});

val regionMetricsWithKeys = regionMetrics.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});


userRegionsWithKeys .join(regionMetricsWithKeys )(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

Надеюсь, это решение вам поможет.

Спасибо

Другие вопросы по тегам