Описание тега kafka-python

Kafka-Python обеспечивает поддержку протокола низкого уровня для Apache Kafka, а также классов потребителей и производителей высокого уровня. Пакетирование запросов поддерживается протоколом, а также маршрутизация запросов с учетом брокера. Для наборов сообщений также поддерживается сжатие Gzip и Snappy.
1 ответ

Как подписаться на список из нескольких шаблонов подстановочных знаков kafka, используя kafka-python?

Я подписываюсь на Kafka, используя шаблон с подстановочным знаком, как показано ниже. Подстановочный знак представляет динамический идентификатор клиента. consumer.subscribe(pattern='customer.*.validations') Это хорошо работает, потому что я могу вы…
15 сен '16 в 20:56
2 ответа

Как вручную назначить разделы, сохраняя возможность автоматической фиксации?

Я пытаюсь вручную назначить разделы для каждого потребителя в группе потребителей. Однако, когда вы добавляете потребителей в группу, Kafka (или, по крайней мере, kafka-python) предполагает, что вы хотите, чтобы координатор группы выполнял все назна…
3 ответа

Кафка производят. Отправляет сообщение никогда

Я использую Kafka 2.12 и модуль kafka-python в качестве клиента Kafka. Я пытаюсь проверить простой производитель: class Producer(Process): daemon = True def run(self): producer = KafkaProducer(bootstrap_servers='kafka:9092') print("Sending messages.…
14 июл '17 в 00:33
2 ответа

AssertionError: Неназначенный раздел

Я пытаюсь использовать данные из темы, устанавливая смещение, но получаю ошибку подтверждения - from kafka import KafkaConsumer consumer = KafkaConsumer('foobar1', bootstrap_servers=['localhost:9092']) print 'process started' print consumer.partitio…
1 ответ

Python: макетирование Кафки для интеграционных тестов

Я немного новичок в интеграционных тестах. У меня есть две службы, которые передают сообщения друг другу с помощью Kafka. Однако, для моих интеграционных тестов я не обязательно хочу запускать Kafka для запуска своих тестов. Есть ли стандартный спос…
0 ответов

Получение исключения: была предпринята операция с чем-то, что не является сокетом

getting OSError: [WinError 10038 Была предпринята операция с чем-то, что не является сокетом при отправке сообщения на kafka, используя код ниже for line_list in a[list with 25k values]: producer = KafkaProducer(bootstrap_servers=['server'],value_se…
16 май '18 в 10:51
2 ответа

Как использовать кафку на торнадо?

Я пытаюсь сделать простое приложение для чата, используя торнадо на основе этого Но я также хочу использовать Кафку для хранения сообщений. Как я могу это сделать? Теперь я использовал это, чтобы сделать потребителя и каким-то образом он работает, н…
12 фев '16 в 07:50
11 ответов

Кафка в докере не работает

Я пытаюсь использовать wurstmeister\kafka-docker изображение с docker-compose, но у меня есть реальные проблемы с подключением всего. Все сообщения или вопросы, которые я проверяю, похоже, не имеют никаких проблем, но я откровенно потерян. (И есть к…
1 ответ

Kafka Python Client - Как обрабатывать возможные ошибки соединения / тайм-аута?

Я пишу приложение Python, которое подключается к очереди Kafka и отправляет сообщение в очередь. У меня есть следующий рабочий код: def send_msg(self, topic, msg): self.producer = KafkaProducer(bootstrap_servers=['XX.XXX.XX.XXX:XXXX']) future = self…
21 фев '17 в 07:41
2 ответа

Как создать новую тему с пикафкой с разделами и репликацией?

Я хочу иметь возможность программно создать тему в Kafka, используя pykafka. Я знаю, что доступ к TopicDict автоматически создаст тему, если таковой не существует, но я не знаю, как управлять количеством разделов / реплик с этим. Кроме того, у него …
08 янв '16 в 19:31
2 ответа

Как пихать уникальные сообщения в разные разделы темы

Я создал тему в Kafka с количеством разделов 3, теперь во всех этих трех разделах я хочу отправить уникальные сообщения. Есть ли способ сделать это? Я проверил producer.send выдвигает дубликаты сообщений на всех разделах. Для тестирования я использу…
1 ответ

Невозможно опросить двоичные сообщения с помощью `kafka-python`

У меня есть тема Kafka, которая получает двоичные данные (необработанные данные захвата пакетов). Я могу подтвердить, что это действительно данные посадки, используя инструменты Kafka CLI. Я получаю несколько сообщений каждую секунду. kafka-console-…
14 мар '17 в 19:24
1 ответ

Сообщения не доставляются с kafka python

Я пытаюсь настроить простое приложение Kafka с помощью kafka-python. Я пытался заставить некоторые примеры, которые я нашел в сети, работать, но, похоже, не смог этого сделать. У меня есть экземпляр kafka, работающий в контейнере Docker. Я протестир…
09 июл '17 в 13:29
1 ответ

Поддержка kafka-python ssl для python <v2.7.9 (без атрибута 'SSLContext')

При попытке подключиться с ssl к kafka используя kafka-python Я получаю следующую ошибку: Traceback (most recent call last): File "server.py", line 23, in &lt;module&gt; kafka_producer = SimpleKafkaProducer() File "/my-service/kafka_producer.py", li…
06 июл '17 в 09:26
1 ответ

python-kafka: возможно ли для потребителя пропускать сообщения на основе атрибутов сообщения?

Дан набор сообщений, каждое с атрибутом приоритета. Возможно ли для потребителя пропускать или игнорировать сообщения, атрибут приоритета которых недостаточно высок? Сначала я десериализировал сообщения, используемые потребителем kafka-python, прове…
10 ноя '16 в 13:16
1 ответ

Опубликовать сообщение в облачном кластере Confluent от kafka-python

Я использую пакет kafka-python для публикации сообщений в облачном кластере kafka. Мой код выглядит следующим образом: from kafka import KafkaProducer producer=KafkaProducer( bootstrap_servers='pkc-epgnk.us-central1.gcp.confluent.cloud:9092', securi…
25 янв '19 в 14:11
3 ответа

Чтение только конкретных сообщений из темы кафки

Сценарий: Я записываю данные объекта JSON в тему kafka во время чтения. Я хочу прочитать только определенный набор сообщений, основанный на значении, присутствующем в сообщении. Я использую библиотеку kafka-python. примеры сообщений: {flow_status: "…
2 ответа

Может ли потребитель читать записи из раздела, в котором хранятся данные с определенным значением ключа?

Вместо того, чтобы создавать много тем, я создаю раздел для каждого потребителя и храню данные, используя ключ. Итак, есть ли способ заставить потребителя в группе потребителей читать из раздела, в котором хранятся данные определенного ключа. Если д…
1 ответ

Производитель Kafka, использующий python: TypeError: все полезные нагрузки сообщения продукта должны быть нулевыми или байтами типа

Я только начал изучать Python и Kafka. Это первый пример, который я пытался начать. http://www.giantflyingsaucer.com/blog/?p=5541 И я получил исключение: Traceback (most recent call last): File "producer.py", line 23, in &lt;module&gt; main() File "…
1 ответ

Постоянно потреблять Kafka и обновлять очередь через определенные промежутки времени, используя многопроцессорность

Я пытаюсь непрерывно потреблять события от кафки. Это же приложение также использует эти использованные данные для проведения некоторого анализа и обновления базы данных с интервалом в n секунд (допустим, n = 60 секунд). В том же приложении, если pr…