Описание тега confluent-kafka-python

Confluent Kafka Python — это эффективная реализация производителей, потребителей и клиента администрирования Kafka на Python, основанная на librdkafka.
0 ответов

Confluent Kafka Producer для инъекции сообщений AVRO - нужны примеры

У меня есть требование сделать следующее. (Для этого мне нужно использовать конфлюентную кафку) Преобразуйте данные JSON в формат AVRO (используя схему Avro). Внедрить данные AVRO в темы Kafka. Итак, не могли бы вы помочь мне предоставить несколько …
29 июн '20 в 13:38
0 ответов

Ошибка Python после обновления библиотеки confluent-kafka

Я обновил confluent-kafka с 1.0.1 до 1.4.2. После обновления, если я выполняю команду "pip install", я получаю следующую ошибку. Traceback (most recent call last): import confluent_kafka as kafka File "/home/sshil/virtual_envs/ma_venv/local/lib…
25 авг '20 в 09:40
2 ответа

Как определить, существует ли тема kafka с помощью confluent-kafka-python

Я использую пакет confluent-kafka-python для взаимодействия с сервером Kafka. Я могу успешно создать тему и подтолкнуть к ней события. Однако моя проблема заключается в том, что я запускаю несколько узлов (работающих в Docker), если второй экземпляр…
1 ответ

Как программно обновить схему темы и совместимость в реестре схем Confluent

У меня есть схема, уже зарегистрированная в реестре схем, что я смог сделать, используя register() нравится, from schema_registry.client import SchemaRegistryClient, schema subject_name = "new-schema" schema_url = "https://{{ schemaRegistry }}:8081"…
1 ответ

Ошибка Confluent Kafka Python: сбой запроса метаданных

Попадание ниже ошибки. Не уверен, что случилось. %5|1591739081.630|REQTMOUT|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/290: Timed out HeartbeatRequest in flight (after 10622ms, timeout #0) %4|1591739081.630|REQTMOUT|rdkafka#consum…
1 ответ

Как расшифровать пользовательское зашифрованное сообщение в Python Confluent_Kafka с помощью key.deserializer и value.deserializer?

Моя команда использует confluent_kafka для потоковой передачи в реальном времени и помещается в очередь с использованием настраиваемого шифрования. В основе технологии лежит Java. Я пытаюсь получить сообщения с помощью python из темы и могу это сдел…
0 ответов

Потребитель Kafka не может получить сообщение, даже если он подписан на тему

Я подписываюсь на тему и пытаюсь получать сообщения, однако, хотя я вижу, что потребитель подписан на эту тему, я не получал сообщений с помощью consumer.consume(cts.token). код просто останавливается на этом. Может ли быть этому причина?
0 ответов

Потеря данных Producer-> Kafka-> Nifi

Пишу сообщения в кафку из csv файлов. Мой продюсер говорит, что все данные производятся по теме Kafka. Наряду с этим я использую apache nifi в качестве потребителя для темы kafka (процессор ConsumeKafka_2_0). Если я передал данные в kafka в один пот…
0 ответов

Kafka Потребительское отставание не равно нулю

У меня есть кластер Kafka с 40 разделами, и я опубликовал несколько миллионов сообщений. У меня есть пул потребителей, и по большей части потребительское отставание < 100. После моего эксперимента я перестал публиковать какие-либо новые сообщения и …
0 ответов

Не могу создавать и использовать сообщение kafka с моего локального компьютера на виртуальную машину ubuntu

Я пытаюсь создать сообщение kafka с моего локального компьютера на Ubuntu vm. Команда Telnet сообщает об успешном подключении к хосту. PS C:\Users\harshal&gt; Telnet harshal-VirtualBox 9092 Successfully connected to Host: &quot;harshal-VirtualBox&qu…
1 ответ

Как отлаживать AvroConsumer в конфлюентной кафке?

Я пытаюсь прочитать Kafka из Python, но получаю сообщение "Нет", "Нет ошибок в интерфейсе командной строки". Я использую переадресацию портов на целевой хост через замазку, а затем тестирую порты через telnet - все работает нормально. Более того, я …
1 ответ

Потребляйте из кафки без бесконечного цикла

В настоящее время я использую клиент Python Confluent kafka для получения сообщений из темы kafka, и код отлично работает внутри while Trueцикл, как показано в примерах в документации. Однако я хотел бы настроить задание cron, которое потребляет из …
25 авг '20 в 17:19
2 ответа

Некоторые потребители Python Confluent Kafka остаются простаивающими / неназначенными, хотя другие перегружены / переназначены

Настроить: 120 потребителей python confluent-kafka, которые подписываются на один и тот же набор тем. 8 тем с разным количеством разделов: 1 тема с 84 разделами, несколько тем с 40-50 разделами и остальные с 1-10 разделами. Общее количество перегоро…
0 ответов

Confluent Kafka AdminClient зайдет в бесконечный цикл, если предоставлена ​​неправильная конфигурация

Версия: confluent-kafka==1.3.0 Ссылка: https://docs.confluent.io/current/clients/confluent-kafka-python/ Я заметил странное поведение при создании объекта AdminClient в python и заметил, что он будет работать в бесконечном цикле, когда указана непра…
0 ответов

библиотека python confluent-kafka не работает с ubutu14 и python3

Я использую confluent-kafka==1.0.1. Он отлично работает, когда я использую py3 и ubuntu18, но не работает с py3 и ubuntu14. Я получаю следующую ошибку. Traceback (most recent call last): File &quot;/usr/local/lib/python3.4/dist-packages/metrics_agen…
1 ответ

confluent-kafka библиотека Python consumer.poll(тайм-аут) не работает должным образом

Когда я установил msg = consumer.poll(timeout=10.0) потребитель ждет 10 секунд и возвращается None как и ожидалось, но когда я изменил это на msg = consumer.poll(timeout=3600.0) этот потребитель просто вернулся Noneсразу вместо ожидания 3600 секунд,…
0 ответов

ошибка схемы производителя confluent kafka avro

Я использую пример кода из https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_producer.py для загрузки данных в тему. Я сделал только одно изменение, и я добавил "default": null в каждое поле для совместимости схемы. Он…
2 ответа

Ошибка при использовании библиотеки python confluent-kafka с AWS lambda

Я пытаюсь использовать библиотеку python confluent-kafka для администрирования моего кластера с помощью лямбда-функции, но функция не работает с ошибкой: &quot;Unable to import module 'Test': No module named 'confluent_kafka.cimpl'&quot; Мои требова…
0 ответов

многопроцессорный клиентский клиент Python Kafka не получает сообщения

Клиент -потребитель Python работал нормально, когда работал как автономный, но не мог получать сообщения при запуске как многопроцессорный рабочий с той же конфигурацией. Клиенты всегда печатают сообщение в блоке, где он получает None для msg. Очень…
1 ответ

Kafka Consumer не использует с последнего зафиксированного смещения после перезапуска

У меня есть опрос потребителей из подписанной темы. Он потребляет каждое сообщение и выполняет некоторую обработку (в течение нескольких секунд), переходит к другой теме и фиксирует смещение. Всего 5000 сообщений, перед перезапуском - израсходовано …