confluent-kafka библиотека Python consumer.poll(тайм-аут) не работает должным образом

Когда я установил msg = consumer.poll(timeout=10.0) потребитель ждет 10 секунд и возвращается None как и ожидалось, но когда я изменил это на msg = consumer.poll(timeout=3600.0) этот потребитель просто вернулся Noneсразу вместо ожидания 3600 секунд, как ожидалось. Я что-то здесь пропустил? вот полный код, если необходимо.

running = True
conf = {'bootstrap.servers': bootstrap_servers,
        'group.id': 'foo',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': False,
        'on_commit': commit_completed}
consumer = Consumer(conf)


def msg_process(msg):
    print(f"key: {msg.key().decode('utf-8')}, value: {msg.value().decode('utf-8')}")


def basic_consume_loop(consumer, topics):
    try:
        consumer.subscribe(topics)

        msg_count = 0
        while running:
            msg = consumer.poll(timeout=3600.0)
            if msg is None:
                print(f"{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: no new message")
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    print('%% %s [%d] reached end at offset %d\n' %
                          (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                # consumer.commit(async=False)
                msg_process(msg)
                msg_count += 1
                if msg_count % MIN_COMMIT_COUNT == 0:
                    consumer.commit(async=True)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()


def shutdown():
    running = False


basic_consume_loop(consumer, [topic_user])

1 ответ

Наверное потому чтоfetch.max.wait.msнастройка ниже, чем передано poll().

Другие вопросы по тегам