Какое разделение использовать при выполнении поиска смещения с помощью AIOKafkaConsumer?

При попытке позволить AIOKafkaConsumer начать читать сообщения с определенного смещения starting_offset, как узнать, какой раздел использовать?

Я пытаюсь использовать AIOKafkaConsumer.seek, но для этого требуется указать TopicPartition в.

      import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer


async def main():
    topic = "test"
    starting_offset = 3
    
    # Publish some messages
    producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
    await producer.start()
    for i in range(10):
        await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))

    # Start consuming from a specific offset
    consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
    await consumer.start()
    consumer.seek(None, starting_offset)

    while True:
        message = await consumer.getone()
        print("message:", message.value)


if __name__ == "__main__":
    asyncio.run(main())

1 ответ

У вашей темы только один раздел? Если да, то используйте 1... В противном случае однозначного ответа на этот вопрос нет.

Разделы имеют индивидуальные значения смещения. Вы можете искать все разделы с одним и тем же смещением, но не гарантируется, что это будет существовать для всех, и вам сначала нужно будет перебрать диапазон номеров разделов (см. partitions_for_topic(topic)) искать их индивидуально.

Другие вопросы по тегам