Описание тега apache-kafka-streams

Связан со встроенным механизмом обработки потоков Apache Kafka под названием Kafka Streams, который представляет собой библиотеку Java для создания приложений распределенной обработки потоков с использованием Apache Kafka.
1 ответ

Kafka Streams: Частичная переработка по ключу

Сценарий: В сценарии веб- сеанса KafkaStreams с неограниченным (или многолетним) хранением, с интерактивными запросами (это может быть просмотрено при необходимости), со многими клиентами, каждый из которых имеет много пользователей (каждый пользова…
1 ответ

Кафка проверяет сообщения в состоянии с обработкой

У меня есть приложение, в котором несколько пользователей могут отправлять операции REST для изменения состояния общих объектов. Когда объект модифицируется, происходит несколько действий (БД, аудит, ведение журнала...). Не все операции действительн…
1 ответ

Как разбить одну тему Кафки на несколько небольших тем Кафки?

У меня есть одна главная тема Кафки, которая получает данные временных рядов. Мне нужно взять каждое значение, которое входит в эту тему, скопировать его и отправить в одну из множества отдельных тем на основе значения в его ключе. Поскольку это дан…
1 ответ

Как выполнить операцию соединения с данными в формате AVRO в потоках kafka с использованием JAVA

ПОТОК-1: [KSTREAM-SOURCE-0000000000]: null, {"id": 1, "name": "john", "age": 26, "updated_at": 1525774480752} [KSTREAM-SOURCE-0000000000]: null, {"id": 2, "name": "jane", "age": 24, "updated_at": 1525774480784} [KSTREAM-SOURCE-0000000000]: null, {"i…
1 ответ

Kafka-streams: установка политики очистки внутренних тем для удаления не работает

Я использую функцию уменьшения потоков kafka, и она создает некоторую внутреннюю тему журнала изменений хранилища состояний (например, app-KSTREAM-REDUCE-STATE-STORE-0000000002-changelog). Я хотел установить байты хранения и изменить политику очистк…
02 сен '18 в 06:48
1 ответ

Кафка Коннект против Акка-стрим Кафка

Я собираюсь внедрить потоковую инфраструктуру для своей организации на основе Kafka и Spark. Тем не менее, я озадачен, решив, как лучше поступить, когда дело доходит до приема данных в Кафке. Многие решения действительно возможны для этой задачи Сам…
0 ответов

Kafka Streams: Магазин не готов

Недавно мы обновили Kafka до v1.1 и Confluent до v4.0. Но после обновления мы столкнулись с постоянными проблемами, касающимися государственных хранилищ. Наше приложение запускает коллекцию потоков, и мы проверяем, готовы ли хранилища состояний, пре…
2 ответа

Интерактивные Запросы Kafka - Доступ к большим данным между экземплярами

Мы планируем запустить приложение kafka streams, распределенное на двух машинах. Каждый экземпляр хранит свои данные Ktable на своем собственном компьютере. Задача, с которой мы сталкиваемся здесь, У нас есть миллион записей в Ktable. Нам нужно пере…
14 июл '17 в 10:35
1 ответ

Применима ли масштабируемость к потоку Kafka, если каждая тема имеет один раздел

Насколько я понимаю, согласно документации потока Kafka, Максимально возможное количество параллельных задач равно максимальному количеству разделов темы среди всех тем в кластере. У меня около 60 тем в кластере Кафки. Каждая тема имеет только один …
16 ноя '17 в 09:16
2 ответа

Как изменить сообщение одной темы Кафки и отправить в другую тему Кафки с помощью Java?

Я создал продюсера, который создает MSG для одной темы A, и мне нужно, чтобы я внес изменения в этот MSG и хочу отправить их в другую тему B, я пытаюсь сделать это с помощью потоков Kafka, но не уверен, что это правильный путь или нет. если для этог…
10 фев '17 в 10:44
1 ответ

Разница между KTable и местным магазином

Какая разница между этими сущностями? Как мне кажется, KTable - простая тема с кафкой compaction политика удаления. Кроме того, если ведение журнала включено для KTable, то есть также журнал изменений, а затем, политика удаления compaction,delete, Л…
24 сен '18 в 22:15
0 ответов

Могут ли потоки Kafka отправлять заголовки в тему вывода

У меня есть POJO "Транзакция", как показано ниже:-> public class Transaction{ private Meta meta; public Headers toHeaders(){ //convert meta into Kakfa headers and return } } Я хочу отправить эти заголовки в тему вывода моего приложения Kafka streams…
24 ноя '17 в 11:37
0 ответов

Ошибки Eclipse Maven при импорте примеров потоков Kafka

Я использую Eclipse Oxygen.1a Release (4.7.1a) на macOS Sierra 10.12.6. Также установите Java 1.8 и Scala 2.12.4. Сделал git-клон следующего хранилища из Confluent: https://github.com/confluentinc/kafka-streams-examples Когда я импортировал это как …
0 ответов

Встроенный Kafka: KTable+KTable leftJoin производит дубликаты записей

Я прихожу в поисках знания тайного. Во-первых, у меня есть две пары тем, по одной теме в каждой паре, которая входит в другую тему. Два KTables формируются последними темами, которые используются в KTable+KTable leftJoin. Проблема в том, что leftJoi…
1 ответ

Пример API потоков Kafka KStream не может быть преобразован в тип

Я работаю через этот учебник Кафка потоков https://kafka.apache.org/0110/documentation/streams/tutorial на машине Centos 7 под управлением JRE 8+ и Kafka 0.11.0.1 Это строка, которая выдает ошибку компиляции KStream<String, String> source = bu…
11 окт '17 в 01:15
1 ответ

Создание и доступ к таблице обновлений в Kstreams

Я хочу получить доступ к потоку данных "A", проверить некоторые условия в таблице "B" и, наконец, обновить состояние таблицы "B". (A,B) ====> check conditions in B ====> finally update B Как я могу реализовать ту же логику в потоках kafka? Из …
21 ноя '17 в 11:08
0 ответов

Kafka Streams: сбросить таймер окна при поступлении нового сообщения

Моя тема Kafka содержит сообщения с полем id. Я хочу агрегировать сообщения по этому идентификатору, используя временное окно, которое должно быть сдвинуто при появлении дубликата. Пример: Я получил сообщение с id = 94, Я хочу дождаться следующего с…
1 ответ

Парсинг данных JSON с использованием потоковых файлов apacke kafka

У меня был сценарий для чтения данных json из моей темы kafka, и, используя версию kafka0.11, мне нужно написать код java для потоковой передачи данных json, присутствующих в теме kafka. Мой ввод - это данные Json, содержащие массивы словарей Теперь…
25 май '18 в 08:32
0 ответов

Kafka Stream Metrics всего окон

Показатели общего процесса мне не понятны. Метрика задокументирована как "Общее количество вызовов процесса". Основываясь на моих наблюдениях, я считаю, что все эти метрики являются оконными (я думаю, что я прочитал где-то 30 секунд в исходном коде)…
09 фев '19 в 19:12
1 ответ

Spark Streaming с данными без схемы

В настоящее время у нас есть настройка конвейера данных, где мы читаем необработанные данные из одной темы Kafka с помощью Logstash и записываем их в ElasticSearch.Данные в этом разделе представлены в формате JSON, но каждая строка может принадлежат…