Почему я получил ошибки PartitionOwnedError и ConsumerStoppedException при запуске нескольких потребителей
Я использую pykafka, чтобы получить сообщение из темы kafka, а затем выполнить некоторый процесс и обновить его до mongodb. Поскольку pymongodb может обновлять только один элемент каждый раз, я запускаю 100 процессов. Но при запуске некоторые процессы вызывали ошибки "PartitionOwnedError и ConsumerStoppedException". Я не знаю почему. Спасибо.
kafka_cfg = conf['kafka']
kafka_client = KafkaClient(kafka_cfg['broker_list'])
topic = kafka_client.topics[topic_name]
balanced_consumer = topic.get_balanced_consumer(
consumer_group=group,
auto_commit_enable=kafka_cfg['auto_commit_enable'],
zookeeper_connect=kafka_cfg['zookeeper_list'],
zookeeper_connection_timeout_ms = kafka_cfg['zookeeper_conn_timeout_ms'],
consumer_timeout_ms = kafka_cfg['consumer_timeout_ms'],
)
while(1):
for msg in balanced_consumer:
if msg is not None:
try:
value = eval(msg.value)
id = long(value.pop("id"))
value["when_update"] = datetime.datetime.now()
query = {"_id": id}}
result = collection.update_one(query, {"$set": value}, True)
except Exception, e:
log.error("Fail to update: %s, msg: %s", e, msg.value)
>
Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 734, in consume
raise ConsumerStoppedException
pykafka.exceptions.ConsumerStoppedException
>
Traceback (most recent call last):
File "dump_daily_summary.py", line 182, in <module>
dump_daily_summary.run()
File "dump_daily_summary.py", line 133, in run
for msg in self.balanced_consumer:
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 745, in __iter__
message = self.consume(block=True)
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 726, in consume
self._raise_worker_exceptions()
File "/data/share/python2.7/lib/python2.7/site-packages/pykafka-2.5.0.dev1-py2.7-linux-x86_64.egg/pykafka/balancedconsumer.py", line 271, in _raise_worker_exceptions
raise ex
pykafka.exceptions.PartitionOwnedError
3 ответа
PartitionOwnedError: проверьте, есть ли какой-либо фоновый процесс, потребляющий в той же самой группе потребителей, возможно, не хватает доступных разделов для запуска другого потребителя.
ConsumerStoppedException: вы можете попробовать обновить версию pykafka ( https://github.com/Parsely/pykafka/issues/574).
На это поведение влияет давняя ошибка, которая была недавно обнаружена и в настоящее время исправляется. Обходной путь, который мы использовали в производственном процессе на Parse.ly, - запускать наших потребителей в среде, которая автоматически перезапускает их при сбое с этими ошибками, пока все разделы не будут принадлежать.
Я встретил ту же проблему, что и ты. Но я запутался в других решениях, таких как добавление достаточного количества разделов для потребителей или обновление версии pykafka. На самом деле, мои условия удовлетворяют вышеуказанным условиям.
Вот версия инструментов:
Python 2.7.10
Кафка 2.11-0.10.0.0
зоопарк 3.4.8
пикафка 2.5.0
Вот мой код:
class KafkaService(object):
def __init__(self, topic):
self.client_hosts = get_conf("kafka_conf", "client_host", "string")
self.topic = topic
self.con_group = topic
self.zk_connect = get_conf("kafka_conf", "zk_connect", "string")
def kafka_consumer(self):
"""kafka-consumer client, using pykafka
:return: {"id": 1, "url": "www.baidu.com", "sitename": "baidu"}
"""
from pykafka import KafkaClient
consumer = ""
try:
kafka = KafkaClient(hosts=str(self.client_hosts))
topic = kafka.topics[self.topic]
consumer = topic.get_balanced_consumer(
consumer_group=self.con_group,
auto_commit_enable=True,
zookeeper_connect=self.zk_connect,
)
except Exception as e:
logger.error(str(e))
while True:
message = consumer.consume(block=False)
if message:
print "message:", message.value
yield message.value
Два исключения ( ConsumerStoppedException и PartitionOwnedError) вызываются функцией consum(block=True) pykafka.balancedconsumer.
Конечно, я рекомендую вам прочитать исходный код этой функции.
Существует аргумент block = True, после изменения его на False программа не может попасть в исключения.
Тогда потребители кафки работают нормально.