Описание тега kafka-join
0
ответов
Встроенный Kafka: KTable+KTable leftJoin производит дубликаты записей
Я прихожу в поисках знания тайного. Во-первых, у меня есть две пары тем, по одной теме в каждой паре, которая входит в другую тему. Два KTables формируются последними темами, которые используются в KTable+KTable leftJoin. Проблема в том, что leftJoi…
18 июл '18 в 17:09
1
ответ
API Kafka Streams: я присоединяюсь к двум KStreams из empmodel
final KStream<String, EmpModel> empModelStream = getMapOperator(empoutStream); final KStream<String, EmpModel> empModelinput = getMapOperator(inputStream); // empModelinput.print(); // empModelStream.print(); empModelStream.join(empModel…
27 фев '17 в 06:05
1
ответ
KStream to KTable Left Join возвращает нулевое значение
В настоящее время я пытаюсь использовать соединение KStream to KTable для обогащения темы Kafka. Для подтверждения концепции у меня в настоящее время есть Kafka Stream с около 600 000 записей, у всех из которых есть один и тот же ключ, и KTable, соз…
19 июл '18 в 19:47
2
ответа
Kafka поток присоединяется с определенным ключом в качестве ввода
У меня есть 3 разные темы с 3 файлами Avro в реестре схемы, я хочу транслировать эти темы, объединять их и записывать в одну тему. проблема в том, что ключ, к которому я хочу присоединиться, отличается от ключа, в который я записываю данные в каждую…
23 янв '17 в 16:21
1
ответ
Kafka Stream работает с JoinWindow для воспроизведения данных
У меня есть 2 потока данных, и я хочу иметь возможность присоединиться к ним в течение 1 месяца, скажем. Когда у меня есть живые данные, с KStream все просто и весело. Я сделал что-то вроде этого; KStream<String, GenericRecord> stream1 = build…
23 янв '17 в 16:53
1
ответ
Kafka Streams объединяются по ключу со сложным условием
Я пытаюсь присоединиться KStream с GlobalKTable по ключу, но с определенной логикой. StreamsBuilder builder = new StreamsBuilder(); KStream<String, Integer> stream = builder.stream(inputTopic1); // key = "ABC" GlobalKTable<String, Integer&g…
13 дек '19 в 19:37
1
ответ
Kafka KStream присоединиться к KStream | перезапустить производительность
Я планирую объединить две темы как KStreams в течение длительного периода (~1 неделя). Предполагая, что в этом окне будут накоплены сотни миллионов записей, сколько времени потребуется присоединяющемуся потребителю для перезапуска? Я спрашиваю об эт…
03 янв '20 в 14:32
0
ответов
Kafka Stream Joins, как указать разные параметры конфигурации для левого и правого потоков
final Properties kafkaStreamConfiguration = new Properties(); kafkaStreamConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID); kafkaStreamConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, APPLICATION_ID); .. final StreamsBuilder bu…
30 апр '20 в 16:00
2
ответа
как обрабатывать отношения "один ко многим" с помощью операций соединения потоков kafka
Не могли бы вы помочь мне, как добиться этого с помощью потоков Kafka? Сценарий: группировка всех счетов-фактур для данных заказа. При потоковой передаче в реальном времени может возникнуть задержка в получении счетов. поэтому мы хотим подождать 20 …
21 сен '20 в 18:52
1
ответ
Kafka - объединение данных из двух разных потоков, когда данные поступают в разное время
У меня есть сценарий, в котором у нас есть два разных потока, и мы получаем данные о них в два разных момента, и мне нужно присоединиться к ним на основе временной метки, которая присутствует в значении. Я попытаюсь объяснить это на примере ниже. in…
15 сен '20 в 15:36
1
ответ
Присоединяются ли KTable и KStream к публикации новой записи при обновлении KTable?
Я реализовал в своем приложении соединение KTable и KStream и ожидаю вывода сообщений в следующих случаях; Новое сообщение в KStream и соответствующая запись в KTable В KStream пришло обновленное сообщение, и в KTable есть соответствующая запись. Ко…
10 фев '21 в 05:50
0
ответов
Самостоятельное присоединение к потоку kafka с обновленным временем потока
У меня есть поток со статусом сайта (подключен / отключен), агрегированный за минуту, где агрегирование подавлено, и для каждого минутного ключа у нас есть только одна запись. Теперь я хочу написать в новую «тревожную» тему сообщения, когда у нас ес…
17 мар '21 в 17:07
1
ответ
Kafka- присоединяйтесь к KStream и KTable по составному ключу
У меня две темы - и plan Тема имеет составной ключ (avro), основанный на двух столбцах, +. supplier тема введена в столбец supplierId и он содержит столбец planCode но нет memberAge. KStream<String, GenericRecord> supplier = builder.stream(sup…
14 апр '21 в 11:34
0
ответов
KTable-Ktable Join вызывает исключение нулевого указателя
Я столкнулся с исключением нулевого указателя при объединении двух KTables. final ValueJoiner<topicA, topicB, user> userJoiner = (topicA, topicB) -> { LOG.debug(LOG_PREFIX + "Begin of join of topicA with topicB"); LOG.trace(LOG_PREFIX + "to…
03 июн '21 в 15:52
1
ответ
Кстрим-Кстрим объединение на основе общего поля
Мы хотим выполнить соединение Kstream-Kstream на основе общего поля (первичного ключа). В настоящее время с приведенным ниже кодом мы получаем результат как просто слияние 2 потоков без каких-либо ограничений первичного ключа. val userRegions: KStre…
08 июл '21 в 14:12
0
ответов
Kafka join не показывает результаты для 2 потоков после ключа groupby и уменьшения java
У меня 2 потока из 2 тем. Предположим, что stream1 и stream2. Я делаю как KStream<String,String> Outstream1 = stream1.selectKey((key,value)-> { ......//some extraction from JsonObject of value part return event | '|' | timestamp //these fie…
11 дек '21 в 10:39
0
ответов
Присоединение потоков Kafka: как подождать некоторое время перед отправкой записей?
В настоящее время у нас есть 2 темы потока Kafka, записи которых поступают постоянно. Мы изучаем возможность объединения двух потоков на основе ключа после ожидания окна продолжительностью 5 минут, но с моим текущим кодом я вижу, что записи отправля…
28 дек '21 в 23:20
1
ответ
Разве мы не можем объединить две таблицы и получить данные в Kafka?
Я объединил две таблицы и получил данные, используя исходный коннектор Postgres. Но каждый раз это давало ту же проблему, т.е. Я выполнил тот же запрос в Postgres, и он работает без проблем. Извлечение данных путем объединения таблиц невозможно в Ka…
19 янв '21 в 04:24
1
ответ
Как лучше всего объединить два события из одной темы с помощью Kafka Streams Api?
Я новичок в потоках кафки, и у меня есть следующий сценарий. Существует тема, содержащая записи типа Event, некоторые из них являются дополнительной информацией о фактическом событии (что-то вроде обновления). Эти две записи генерируются почти однов…
02 апр '22 в 13:01
0
ответов
Как реализовать KStream-Ktable leftJoin, используя подход с несколькими ключами (соединение pk-fk и fk-fk) в потоках Kafka?
Я работаю над объединением потока и таблиц в kafka, используя подход левого соединения, поскольку я получаю значения для некоторых соединений, потому что в этом случае соединение реализовано как внешний ключ для первичного ключа, что является рабочи…
14 мар '22 в 14:44