Kafka Streams объединяются по ключу со сложным условием
Я пытаюсь присоединиться KStream
с GlobalKTable
по ключу, но с определенной логикой.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"
stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
(key, value) -> key,
(valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});
Например, если ключ = "ABC", то:
- сначала присоединитесь по полному ключу - например, "ABC" = "ABC"
- затем, если они не соединены, соединить по первым двум символам (один символ удален), т.е. "AB" = "AB"
- наконец, попробуйте присоединиться только одним символом - например, "A" = "A"
Кроме того, необходимо знать, по какому условию было выполнено соединение - например, по 3 буквам / по 2 буквам / по 1 букве.
Вопрос в том, возможно ли это вообще или нужно искать обходной путь? Например, сделать копии GlobalKTable с соответствующими ключами (таблица с ключом "ABC", одна с ключом "AB" и одна с ключом "A") и выполнить 3 отдельных соединения? Или, может быть, какие-то другие предложения?
Заранее спасибо!
1 ответ
Возможно использование серии левых объединений для нескольких таблиц (если вы знаете, что часто хотите попробовать объединение). Если соединение выполнено успешно, вы пропускаете следующее соединение. Используя комбинациюleftJoin()
а также branch()
должен позволить вам разделить поток после каждого соединения на "присоединился" и "повторить попытку". В конце концов, вы можетеmerge()
различные потоки результатов вместе, если хотите.