как обрабатывать отношения "один ко многим" с помощью операций соединения потоков kafka

Не могли бы вы помочь мне, как добиться этого с помощью потоков Kafka?

Сценарий: группировка всех счетов-фактур для данных заказа. При потоковой передаче в реальном времени может возникнуть задержка в получении счетов. поэтому мы хотим подождать 20 минут, чтобы сгруппировать все счета, прежде чем присоединяться к нему.

Пример: заказ "x" содержит 3 счета, которые могут быть получены в течение 20 минут.

Ожидаемый результат: Порядок и 3 счета-фактуры должны быть доступны в виде отдельных данных в выходной теме.

Для этого у нас есть топология ниже.

  1. мы имеем поток заказов и поток счета отдельно

  2. Мы группируем счета по ключу заказа. мы устанавливаем 20-минутные переворачивающиеся окна

  3. Объединение данных заказа с сформированной группой счетов-фактур

  4. запись вывода в новую тему

Проблема: Шаг 3 не ждет завершения шага 2. Регистрация выполняемой операции, как только будет получен заказ. поэтому мы не получаем ожидаемый результат.

мы пытались добиться того же, используя окна соединения. но поскольку окна соединения являются скользящими окнами, мы получаем повторяющиеся данные в выходной теме.

В приведенном выше примере, если мы используем соединяемые окна вместо переворачивающихся окон, мы получаем 3 выходных данных, для которых заказ имеет 1 счет-фактуру, 2 счета-фактуры и 3 счета-фактуры соответственно.

Пожалуйста, помогите мне решить эту проблему или предложите альтернативный подход

Фрагмент кода:

 KTable<Windowed<String>, List<InvoiceList>> invoiceList= invoiceStream
                .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofSeconds(1200)))
                .aggregate(() -> new ArrayList<InvoiceList>(),
                        (key, newValue, agg) -> {
                            new KeyValue<>(key, agg.add(newValue));
                            return agg;
                        },
                        Materialized.as("invoice-list").with(Serdes.String(), new ArrayListSerde<InvoiceList>(AppSerdes.InvoiceList())))
 
KStream<String, Order> orderOutput=
 
                orderStream.join(invoiceList, Joiner);
 
       
        orderOutput.to(AppConfig.OutputTopic.OUTPUT_ORDER,Produced.with(Serdes.String(), AppSerdes.Order()));

2 ответа

Я предполагаю, что сначала идет заказ, а затем счет (-а), а не наоборот. Если мое предположение верно, то ваша логика не сработает. Поскольку к тому времени, когда заказ поступает в ваш KStream, счетов-фактур может не быть, и, следовательно, соединение не получает их. Пожалуйста, помните, KStream-KTable соединения являются не-оконном присоединяется и может быть использована как против поисков KTable (изменения потока).

В нашем случае это соединение работает. Итак, мы получили его как два отдельных потока и добавили настраиваемую логику для потребителя для обработки нашего варианта использования.

Благодаря!

Другие вопросы по тегам