Kafka Streams объединяются по ключу со сложным условием

Я пытаюсь присоединиться KStream с GlobalKTable по ключу, но с определенной логикой.

    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC"
    GlobalKTable<String, Integer> table = builder.globalTable(inputTopic2); // key = "ABC"

    stream.join(table, // join first by "ABC" = "ABC", then by "AB" = "AB", then by "A" = "A"
            (key, value) -> key,
            (valueLeft, valueRigth) -> {/* identify by which condition the join was performed */});

Например, если ключ = "ABC", то:

  • сначала присоединитесь по полному ключу - например, "ABC" = "ABC"
  • затем, если они не соединены, соединить по первым двум символам (один символ удален), т.е. "AB" = "AB"
  • наконец, попробуйте присоединиться только одним символом - например, "A" = "A"

Кроме того, необходимо знать, по какому условию было выполнено соединение - например, по 3 буквам / по 2 буквам / по 1 букве.

Вопрос в том, возможно ли это вообще или нужно искать обходной путь? Например, сделать копии GlobalKTable с соответствующими ключами (таблица с ключом "ABC", одна с ключом "AB" и одна с ключом "A") и выполнить 3 отдельных соединения? Или, может быть, какие-то другие предложения?

Заранее спасибо!

1 ответ

Возможно использование серии левых объединений для нескольких таблиц (если вы знаете, что часто хотите попробовать объединение). Если соединение выполнено успешно, вы пропускаете следующее соединение. Используя комбинациюleftJoin() а также branch()должен позволить вам разделить поток после каждого соединения на "присоединился" и "повторить попытку". В конце концов, вы можетеmerge() различные потоки результатов вместе, если хотите.

Другие вопросы по тегам