Какое разделение использовать при выполнении поиска смещения с помощью AIOKafkaConsumer?
При попытке позволить
AIOKafkaConsumer
начать читать сообщения с определенного смещения
starting_offset
, как узнать, какой раздел использовать?
Я пытаюсь использовать AIOKafkaConsumer.seek
, но для этого требуется указать TopicPartition в.
import asyncio
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
async def main():
topic = "test"
starting_offset = 3
# Publish some messages
producer = AIOKafkaProducer(bootstrap_servers="localhost:29092")
await producer.start()
for i in range(10):
await producer.send_and_wait(topic, bytes(f"hello {i}", "utf-8"))
# Start consuming from a specific offset
consumer = AIOKafkaConsumer(topic, bootstrap_servers="localhost:29092")
await consumer.start()
consumer.seek(None, starting_offset)
while True:
message = await consumer.getone()
print("message:", message.value)
if __name__ == "__main__":
asyncio.run(main())
1 ответ
У вашей темы только один раздел? Если да, то используйте
1
... В противном случае однозначного ответа на этот вопрос нет.
Разделы имеют индивидуальные значения смещения. Вы можете искать все разделы с одним и тем же смещением, но не гарантируется, что это будет существовать для всех, и вам сначала нужно будет перебрать диапазон номеров разделов (см.
partitions_for_topic(topic)
) искать их индивидуально.