Невозможно опросить двоичные сообщения с помощью `kafka-python`
У меня есть тема Kafka, которая получает двоичные данные (необработанные данные захвата пакетов). Я могу подтвердить, что это действительно данные посадки, используя инструменты Kafka CLI. Я получаю несколько сообщений каждую секунду.
kafka-console-consumer.sh --zookeeper svr:2181 --topic test
Но когда я использую kafka-python, я не могу получить никаких сообщений. poll
Метод просто не возвращает результатов.
(Pdb) consumer = kafka.KafkaConsumer("test", bootstrap_servers=["svr:9092"])
(Pdb) consumer.poll(5000)
{}
Я был в состоянии использовать kafka-python для извлечения сообщений из отдельной темы, которая содержит только текстовые строки.
Мне любопытно, как-то внутренне kafka-python отбрасывает сообщения, потому что они являются двоичными и не проходят какую-то проверку. Как я могу копнуть глубже и понять, почему не могут быть получены сообщения?
1 ответ
Проблема заключалась в том, что данные, отправленные в тему, использовали быстрое сжатие. Все, что мне нужно было сделать, это установить дополнительный модуль для обработки snappy.
pip install python-snappy
К сожалению, используя код, который я изложил в этом вопросе, он просто не возвращает данных, вместо того, чтобы сказать мне, что проблема связана со сжатием.
Для сравнения я использовал более старый потребительский API, который правильно сообщает о проблеме и привел меня к этому решению.
>>> client = kafka.SimpleClient("svr:9092")
>>> consumer.close()
>>> consumer = kafka.SimpleConsumer(client, "group", "test")
>>> for message in consumer:
... print(message)
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 353, in __iter__
message = self.get_message(True, timeout)
File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 305, in get_message
return self._get_message(block, timeout, get_partition_info)
File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 320, in _get_message
self._fetch()
File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 379, in _fetch
fail_on_error=False
File "/usr/lib/python2.7/site-packages/kafka/client.py", line 665, in send_fetch_request
KafkaProtocol.decode_fetch_response)
File "/usr/lib/python2.7/site-packages/kafka/client.py", line 295, in _send_broker_aware_request
for payload_response in decoder_fn(future.value):
File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 212, in decode_fetch_response
for partition, error, highwater_offset, messages in partitions
File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 219, in decode_message_set
inner_messages = message.decompress()
File "/usr/lib/python2.7/site-packages/kafka/protocol/message.py", line 121, in decompress
assert has_snappy(), 'Snappy decompression unsupported'
AssertionError: Snappy decompression unsupported