Описание тега kafka-producer-api

Используйте для вопросов, связанных с API-интерфейсом производителя Apache Kafka. Любой вопрос, связанный с продюсированием в темах Kafka. Сбои и восстановление производителя, идемпотентность и транзакционный API.
1 ответ

Как разбить одну тему Кафки на несколько небольших тем Кафки?

У меня есть одна главная тема Кафки, которая получает данные временных рядов. Мне нужно взять каждое значение, которое входит в эту тему, скопировать его и отправить в одну из множества отдельных тем на основе значения в его ключе. Поскольку это дан…
1 ответ

Как отправить пользовательский объект в тему Кафки с продюсером

Я хотел бы отправить свой класс Account с Producer в мою тему Kafka, после чего я объединюсь с Kafka Stream. Тем не менее, я не могу отправить объект, я получаю сообщение об ошибке: Вызывается: org.apache.kafka.common.KafkaException: bank.Account не…
1 ответ

Для регистратора не найдено ни одного дополнительного приложения (org.apache.kafka.clients.producer.ProducerConfig)

Я пишу код, в котором я пытаюсь использовать сообщения, используя kafka и spark. Но мой код не работает. Вот мой код: import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord } import org.apache.spark.streaming._ impo…
2 ответа

При использовании Kafka High Level ConsumerConnector, где будут сохраняться смещения?

Как следует из нового документа, сама Kafka позаботится об управлении смещениями при использовании API высокого уровня по умолчанию. Но когда я создаю ConsumerConnector, как показано ниже, он все равно требует от меня предоставления свойства zookeep…
0 ответов

Как передать несколько данных в кафку производителя?

Необходимо передать несколько данных (объект) вместе с данными файла, Есть ли сериализатор для передачи объекта или какие-либо пути? Вот код, который просто передает данные файла, props.put("bootstrap.servers", "localhost:9092"); props.put("metadata…
1 ответ

Как мне установить расширение Kafka для PHP?

Это расширение, которое я пытаюсь установить: https://github.com/EVODelavega/phpkafka Сообщения, передаваемые в очередь, должны быть в формате JSON. В настоящее время я получаю ошибки установки: 1. В инструкциях меня просят установить librdkafka. 2.…
1 ответ

kafka avro схема - 500 внутренняя ошибка сервера после 1000 сообщений

Я использую слитый прокси 0.9 rest proxy и системный реестр для создания сообщений avro в теме. Я использую модуль узла "kafka-rest". После 1000 сообщений он начинает выдавать 500 внутренних ошибок сервера. После прочтения некоторых сообщений я пони…
25 авг '16 в 17:58
1 ответ

Создать несколько потребителей для одной и той же темы в Кафке

Я новичок и могу видеть один пример с одним потребителем в репозитории github ниже, но есть ли идеи, как создать несколько потребителей для одной и той же темы? https://github.com/confluentinc/confluent-kafka-go/tree/master/examples Любая фабрика по…
1 ответ

Обновление клиента Kafka с 0.8.2.0 до 0.11.0.0

В настоящее время в моей компании мы осуществляем миграцию с Kafka 0.8 до 0.11, этапы миграции брокеров и четко указаны в документации по kafka здесь. Я застрял в том, что, модернизируя клиентов kafka (производителей, потребителей, потоковое вещание…
27 сен '17 в 12:32
2 ответа

Kafka Consumer не работает с транзакционной семантикой (изоляция. Level = read_commited)

Kafka Consumer не может использовать ни одно сообщение, если у него есть транзакционная семантика в свойствах. Но когда я удаляю это свойство или обновляю это свойство до read_uncommited, его сообщения потребляют. Ниже приведены мои потребительские …
1 ответ

Golang Kafka не использует все сообщения

Первый пакет:- Я пытаюсь извлечь данные из 100 плоских файлов и загрузить их в массив и вставить их в производитель kafka один за другим в виде байтового массива. Вторая партия:- Я потребляю у потребителя kafka и затем вставляю их в базу данных NoSQ…
0 ответов

Ошибка производителя Kafka Quickstart в Mac OS X

Я продолжаю испытывать раздражающую, периодически возникающую проблему Publisher, пытаясь следовать Kafka Quickstart - Kafka 1.0.0 Зоопарк начинает хорошо - binding to port 0.0.0.0/0.0.0.0:2181 Кафка брокер начинает нормально - Awaiting socket conne…
20 фев '18 в 14:45
2 ответа

Как Кафка обращается с производителями, когда все брокеры рушатся?

При изучении и тестировании Kafka возникло сомнение: Как Кафка управляет производством записей, когда все не работает? Я знаю, что у потребителей есть тайм-аут на опросах, и он будет время от времени опрашивать новые записи, пока Кафка не вернется в…
12 сен '17 в 16:11
2 ответа

KafkaProducer отправляет список сообщений или разбивает список на отдельные сообщения

Можно ли группировать 100 сообщений в один объект и отправлять эти объекты в kafka, или я должен разделить эти 100 сообщений на отдельные сообщения и затем поместить их в kafka. Скажем, например, у меня есть объект, содержащий список. Я могу помести…
16 апр '18 в 19:02
1 ответ

Уменьшить KafkaProducer отправить время асинхронного ответа

Мой Java KafkaProducer способен отправлять сообщения и получать асинхронный обратный вызов в обоих случаях успеха / сбоя должным образом. Всякий раз, когда происходит сбой, в Call back я получаю сообщение "Не удалось обновить метаданные через 60000 …
19 дек '18 в 18:29
1 ответ

Кафка производитель плохая производительность

У меня проблема с производителем кафки. На самом деле я использую Spring Kafka и отправляю сообщения через KafkaTemplate следующим образом: DefaultKafkaProducerFactory<K, V> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<>…
0 ответов

Производители / брокеры Kafka, не использующие указанный IP

Итак, у меня кластер из 6 машин. Эти машины имеют 2 IP-адреса (публичный и частный). Частные IP-адреса используют высокоскоростную сеть (1 Гбит / с), а публичные используют более медленную сеть (120 Мбит / с). При настройке моего кластера kafka/zook…
31 дек '16 в 23:29
0 ответов

Пикафка 2.3.1 Производитель Hang

Мы используем pykafka 2.3.1 и посмотреть, как продюсер зависает время от времени. Сообщение об ошибке: Сообщение не доставлено!! UnknownTopicOrPartition("Произошел сбой при получении запроса на имя темы / от 0 до {ip: порт} с кодом ошибки 3". Мы исп…
01 сен '18 в 13:09
1 ответ

Будущие метаданные производителя Kafka в обратном вызове

В моем приложении при отправке сообщений я использую метаданные в обратном вызове, чтобы сохранить смещение записи для будущего использования. Однако иногда metadata.offset() возвращает -1, что усложняет ситуацию позже. Почему это происходит, и есть…
24 янв '19 в 11:48
0 ответов

Лучший способ реализовать производитель кафки в ruby ​​без потери данных

Каков наилучший способ реализации производителя Kafka в RoR с использованием гема ruby-kafka (0.7.4) без потери данных при сбое экземпляра? Сообщение получается из нескольких потоков. Таким образом, простой вызов метода delivery_message не сработает…