confluent-kafka библиотека Python consumer.poll(тайм-аут) не работает должным образом
Когда я установил msg = consumer.poll(timeout=10.0)
потребитель ждет 10 секунд и возвращается None
как и ожидалось, но когда я изменил это на msg = consumer.poll(timeout=3600.0)
этот потребитель просто вернулся None
сразу вместо ожидания 3600 секунд, как ожидалось. Я что-то здесь пропустил? вот полный код, если необходимо.
running = True
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'foo',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'on_commit': commit_completed}
consumer = Consumer(conf)
def msg_process(msg):
print(f"key: {msg.key().decode('utf-8')}, value: {msg.value().decode('utf-8')}")
def basic_consume_loop(consumer, topics):
try:
consumer.subscribe(topics)
msg_count = 0
while running:
msg = consumer.poll(timeout=3600.0)
if msg is None:
print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: no new message")
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
# consumer.commit(async=False)
msg_process(msg)
msg_count += 1
if msg_count % MIN_COMMIT_COUNT == 0:
consumer.commit(async=True)
finally:
# Close down consumer to commit final offsets.
consumer.close()
def shutdown():
running = False
basic_consume_loop(consumer, [topic_user])