PyKafka не может подключиться к разделенной теме кафки
apache-storm-1.0.2
hbase-1.2.1
kafka_2.10-0.10.0.0
zookeeper-3.4.9
опс:
zkServer.sh start
$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties &!
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 4 --topic project
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
logons
metrics
notifications
ticks
project
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic project
Topic:project PartitionCount:4 ReplicationFactor:1 Configs:
Topic: project Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: project Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: project Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: project Partition: 3 Leader: 0 Replicas: 0 Isr: 0
zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/dmitry/Development/zookeeper/bin/../conf/zoo.cfg
Mode: standalone
java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
uname -a
Linux dmitry.npb.io 4.9.13-200.fc25.x86_64
Теперь я запускаю следующий код на двух разных панелях tmux:
ffrom pykafka import KafkaClient, SslConfig
from pykafka.exceptions import ConsumerStoppedException, PartitionOwnedError
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics[b'project']
try:
messages = topic.get_balanced_consumer(
consumer_group=b'project_bigtable',
zookeeper_connect='localhost:2181')
except (ConsumerStoppedException, PartitionOwnedError):
print('No connection')
messages.stop()
messages.start()
print('Connected')
for message in messages:
if message is not None:
print(message.offset, message.value)
первый запуск - без проблем второй запуск в другой панели, окне или терминале
Я получаю следующую трассировку:
python consumer.py
Failed to acquire partition <pykafka.partition.Partition at 0x7fb8cb9b3cf8 (id=3)> after 4 retries.
Stopping consumer in response to error
Traceback (most recent call last):
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 633, in _add_partitions
ephemeral=True
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/client.py", line 834, in create
sequence=sequence, makepath=makepath).get()
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 78, in get
raise self._exception
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 206, in captured_function
return function(*args, **kwargs)
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 224, in captured_function
value = function(*args, **kwargs)
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/client.py", line 889, in create_completion
return self.unchroot(result.get())
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/kazoo/handlers/utils.py", line 72, in get
raise self._exception
kazoo.exceptions.NodeExistsError: ((), {})
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 306, in start
self._rebalance()
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 603, in _rebalance
self._update_member_assignment()
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 577, in _update_member_assignment
self._add_partitions(new_partitions - current_zk_parts)
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 636, in _add_partitions
raise PartitionOwnedError(p)
pykafka.exceptions.PartitionOwnedError
Connected
Traceback (most recent call last):
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 731, in consume
message = self._consumer.consume(block=block)
AttributeError: 'NoneType' object has no attribute 'consume'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "consumer.py", line 18, in <module>
for message in messages:
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/home/dmitry/Projects/project/storm/.venv/storm/lib/python3.5/site-packages/pykafka/balancedconsumer.py", line 734, in consume
raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException
- мои требования к виртуальной среде.txt
- мой кафка server.properties
- мой зоопарк zoo.cfg
Я использую Fedora 25 / Python 3.5.2
1 ответ
Я решил пойти с kafka-python, так как он работал без глюков с разделенными темами kafka без глюков, которые я имел с pykafka
это мой рабочий код:
from kafka import KafkaConsumer
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('trivver',
group_id='trivver_bigtable',
bootstrap_servers=['localhost:9092'])
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
хорошие вещи:
- Мне не нужно указывать ссылку на zookeeper в init для потребителя, он правильно балансирует работников
- когда у меня чрезмерное количество рабочих (выше доступных разделов в теме кафки) - это просто ожидание без сбоя с Exception (как в случае с pykafka)
- нет проблем с присоединением к новым рабочим, нет сбоев в течение 24-часового периода тестирования с большой нагрузкой kafka, нет заметных утечек памяти
в нем реализован десериализатор msgpack из коробки (это то что мне нужно)
KafkaConsumer(value_deserializer=msgpack.unpackb)