Пикафка - отправка сообщений и получение подтверждений асинхронно
PyKafka имеет ограничение, которое:
очередь отчетов о доставке является локальной для потока: она будет обслуживать отчеты только для сообщений, которые были созданы из текущего потока
Я пытаюсь написать скрипт, в котором я могу асинхронно отправлять сообщения, используя одну функцию, и продолжать получать подтверждения через другую функцию.
Вот функции:
def SendRequest(producer):
count=0
while True:
count += 1
producer.produce('test msg', partition_key='{}'.format(count))
if count == 50000:
endtime=datetime.datetime.now()
print "EndTime : ",endtime
print "Done sending all messages.Waiting for response now"
return
def GetResponse(producer):
count_response=0
while True:
try:
msg, exc = producer.get_delivery_report(block=False)
if exc is not None:
count_response+=1
print 'Failed to deliver msg {}: {}'.format(
msg.partition_key, repr(exc))
else:
print "Count Res :",count_response
count_response+=1
except Queue.Empty:
pass
except Exception,e:
print "Unhandled exception : ",e
Многопоточность и многопроцессорность не помогли. Эти две вышеупомянутые функции должны выполняться асинхронно / параллельно. Какой подход должен быть использован здесь?
1 ответ
Вопрос: где я могу асинхронно отправлять сообщения... и продолжать получать подтверждения
Это решение с asyncio.coroutine
будут удовлетворены ваши потребности.
Примечание: есть несколько недостатков!
- это
asyncio
код требует как минимум Python 3.5- Для каждого сообщения создается новая задача
Это реализует class AsyncProduceReport()
:
import asyncio
from pykafka import KafkaClient
import queue, datetime
class AsyncProduceReport(object):
def __init__(self, topic):
self.client = KafkaClient(hosts='127.0.0.1:9092')
self.topic = self.client.topics[bytes(topic, encoding='utf-8')]
self.producer = self.topic.get_producer(delivery_reports=True)
self._tasks = 0
# async
@asyncio.coroutine
def produce(self, msg, id):
print("AsyncProduceReport::produce({})".format(id))
self._tasks += 1
self.producer.produce(bytes(msg, encoding='utf-8'))
# await - resume next awaiting task
result = yield from self.get_delivery_report(id)
self._tasks -= 1
# This return values are passed to self.callback(task)
return id, result
def get_delivery_report(self, id):
"""
This part of a Task, runs as long as of receiving the delivery_report
:param id: ID of Message
:return: True on Success else False
"""
print("{}".format('AsyncProduceReport::get_delivery_report({})'.format(id)))
while True:
try:
msg, exc = self.producer.get_delivery_report(block=False)
return (not exc, exc)
except queue.Empty:
# await - resume next awaiting task
yield from asyncio.sleep(1)
@staticmethod
def callback(task):
"""
Processing Task Results
:param task: Holds the Return values from self.produce(...)
:return: None
"""
try:
id, result = task.result()
print("AsyncProduceReport::callback: Msg:{} delivery_report:{}"
.format(id, result))
except Exception as e:
print(e)
def ensure_futures(self):
"""
This is the first Task
Creates a new taks for every Message
:return: None
"""
# Create 3 Tasks for this testcase
for id in range(1, 4):
# Schedule the execution of self.produce(id): wrap it in a future.
# Return a Task object.
# The task will resumed at the next await
task = asyncio.ensure_future(self.produce('test msg {} {}'
.format(id, datetime.datetime.now()), id))
# Add a Result Callback function
task.add_done_callback(self.callback)
# await - resume next awaiting task
# This sleep value could be 0 - Only for this testcase == 5
# Raising this value, will give more time for waiting tasks
yield from asyncio.sleep(5)
# print('Created task {}...'.format(_id))
# await - all tasks completed
while self._tasks > 0:
yield from asyncio.sleep(1)
Использование:
if __name__ == '__main__':
client = AsyncProduceReport('topic01')
loop = asyncio.get_event_loop()
loop.run_until_complete(client.ensure_futures())
loop.close()
print("{}".format('EXIT main()'))
Qutput:
AsyncProduceReport::produce(1) AsyncProduceReport::get_delivery_report(1) AsyncProduceReport::produce(2) AsyncProduceReport::get_delivery_report(2) AsyncProduceReport::callback: Msg:1 delivery_report:(True, None) AsyncProduceReport::produce(3) AsyncProduceReport::get_delivery_report(3) AsyncProduceReport::callback: Msg:2 delivery_report:(True, None) AsyncProduceReport::callback: Msg:3 delivery_report:(True, None)
Протестировано с Python:3.5.3 - Пикафка:2.7.0