Как сделать синхронизацию нескольких логов в кафке?

Предположим, у меня есть 2 типа журналов, которые имеют общее поле "uid", и я хочу вывести журнал, если журнал обоих из этих 2 журналов, содержащих uid, поступает, как соединение, возможно ли это для Кафки?

1 ответ

Решение

Да, конечно. Проверьте Kafka Streams, в частности, DSL API. Это выглядит примерно так:

 StreamsBuilder builder = new StreamsBuilder();

 KStream<byte[], Foo> fooStream = builder.stream("foo");

 KStream<byte[], Bar> barStream = builder.stream("bar");

 fooStream.join(barStream,
                (foo, bar) -> {
                    foo.baz = bar.baz;
                    return foo;
                },
                JoinWindows.of(1000))
          .to("buzz");

Это простое приложение использует две входные темы ("foo" и "bar"), присоединяет их и записывает их в тему "buzz". Поскольку потоки бесконечны, при объединении двух потоков необходимо указать окно объединения (на 1000 миллисекунд выше), которое представляет собой относительную разницу во времени между двумя сообщениями в соответствующих потоках, чтобы сделать их пригодными для объединения.

Вот более полный пример: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

А вот документация: https://docs.confluent.io/current/streams/developer-guide/dsl-api.html. Вы найдете много разных типов соединений, которые вы можете выполнить:

Важно отметить, что хотя приведенный выше пример будет детерминистически синхронизировать потоки - если вы сбросите и повторно обработаете топологию, вы получите один и тот же результат каждый раз, - не все операции объединения в потоках Kafka являются детерминированными. Начиная с версии 1.0.0 и ранее, примерно половина не является детерминированной и может зависеть от порядка данных, полученных из базовых разделов темы. В частности, внутренний KStream-KStream и все KTable-KTable соединения являются детерминированными. Другие объединения, как и все KStream-KTable присоединяется и левый / внешний KStream-KStream объединения являются недетерминированными и зависят от порядка данных, потребляемых потребителями. Имейте это в виду, если вы разрабатываете свою топологию для повторной обработки. Если вы используете эти недетерминированные операции, когда ваша топология работает в реальном времени, порядок событий по мере их поступления будет давать один результат, но если вы обрабатываете свою топологию, вы можете получить другой результат. Обратите внимание также на такие операции, как KStream#merge() также не дают детерминированных результатов. Для получения дополнительной информации об этой проблеме см. Почему моя топология Kafka Streams неправильно воспроизводит / воспроизводит? и этот список рассылки

Другие вопросы по тегам