Постоянно потреблять Kafka и обновлять очередь через определенные промежутки времени, используя многопроцессорность
Я пытаюсь непрерывно потреблять события от кафки. Это же приложение также использует эти использованные данные для проведения некоторого анализа и обновления базы данных с интервалом в n секунд (допустим, n = 60 секунд).
В том же приложении, если process1 = Kafka Consumer , process2= Data Analysis and database update logic.
process1 is to be run continuously
process2 is to be executed once every n=60 seconds
process2
занимается вычислениями и обновлением базы данных, поэтому выполнение займет 5-10 секунд. я не хочу process1
останавливаться во время process2
выполняется. Следовательно, я использую multiprocessing module
(process1,process2
было бы thread1,thread2
если бы я использовал Threading
модуль в Python, но из-за того, что я прочитал о GIL и Threading
модуль не в состоянии использовать многоядерную архитектуру, я решил пойти с multiprocessing
модуль.) для достижения параллелизма в этом случае. (Если мое понимание GIL
или же Threading
указанные выше ограничения модуля неверны, мои извинения и, пожалуйста, не стесняйтесь исправлять меня).
Приложение, которое у меня есть, имеет довольно простое взаимодействие между двумя процессами, где process1
просто заполняет очередь всеми полученными сообщениями за 60 секунд, а по истечении 60 секунд просто передает все сообщения process2
,
У меня проблемы с этой логикой передачи. Как мне перенести содержимое очереди из process1
в process2
(Я предполагаю, что это будет основной процесс или другой процесс? Это еще один вопрос, должен ли я создавать 2 процесса в дополнение к основному процессу?) В конце 60 секунд, а затем очищать содержимое очереди, чтобы он снова запускался другая итерация.
Пока у меня есть следующее:
import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue
def kafka_init():
client=KafkaClient('kafka1.wpit.nile.works')
consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
return consumer
def consumeMessages(q):
print "thread started"
while not q.empty():
try:
print q.get(True,1)
Queue.Empty:
break
print "thread ended"
if __name__=="__main__":
starttime=time.time()
timeout=starttime+ 10 #timeout of read in seconds
consumer=kafka_init()
q=Queue()
p=Process(target=consumeMessages,args=q)
while(True):
q.put(consumer.get_message())
if time.time()>timeout:
#transfer logic from process1 to main process here.
print "Start time",starttime
print "End time",time.time()
p.start()
p.join()
break
Любая помощь приветствуется.
1 ответ
Проблема, с которой вы сталкиваетесь, не является специфичной для kafka, поэтому я собираюсь использовать общие "сообщения", которые просто являются целочисленными.
Мне кажется, что основная проблема заключается в том, что, с одной стороны, вы хотите обрабатывать сообщения сразу после их создания, а с другой - обновлять базу данных каждые 60 секунд.
Если вы используете q.get()
по умолчанию этот вызов метода блокируется до тех пор, пока в очереди не появится сообщение. Это может занять более 60 секунд, что приведет к задержке обновления базы данных. Таким образом, мы не можем использовать блокировку q.get
, Нам нужно использовать q.get
с таймаутом, чтобы звонок не блокировался:
import time
import multiprocessing as mp
import random
import Queue
def process_messages(q):
messages = []
start = time.time()
while True:
try:
message = q.get(timeout=1)
except Queue.Empty:
pass
else:
messages.append(message)
print('Doing data analysis on {}'.format(message))
end = time.time()
if end-start > 60:
print('Updating database: {}'.format(messages))
start = end
messages = []
def get_messages(q):
while True:
time.sleep(random.uniform(0,5))
message = random.randrange(100)
q.put(message)
if __name__ == "__main__":
q = mp.Queue()
proc1 = mp.Process(target=get_messages, args=[q])
proc1.start()
proc2 = mp.Process(target=process_messages, args=[q])
proc2.start()
proc1.join()
proc2.join()
производит вывод, такой как:
Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]