Постоянно потреблять Kafka и обновлять очередь через определенные промежутки времени, используя многопроцессорность

Я пытаюсь непрерывно потреблять события от кафки. Это же приложение также использует эти использованные данные для проведения некоторого анализа и обновления базы данных с интервалом в n секунд (допустим, n = 60 секунд).

В том же приложении, если process1 = Kafka Consumer , process2= Data Analysis and database update logic.

process1 is to be run continuously
process2 is to be executed once every n=60 seconds 

process2 занимается вычислениями и обновлением базы данных, поэтому выполнение займет 5-10 секунд. я не хочу process1 останавливаться во время process2 выполняется. Следовательно, я использую multiprocessing module (process1,process2 было бы thread1,thread2 если бы я использовал Threading модуль в Python, но из-за того, что я прочитал о GIL и Threading модуль не в состоянии использовать многоядерную архитектуру, я решил пойти с multiprocessing модуль.) для достижения параллелизма в этом случае. (Если мое понимание GIL или же Threading указанные выше ограничения модуля неверны, мои извинения и, пожалуйста, не стесняйтесь исправлять меня).

Приложение, которое у меня есть, имеет довольно простое взаимодействие между двумя процессами, где process1 просто заполняет очередь всеми полученными сообщениями за 60 секунд, а по истечении 60 секунд просто передает все сообщения process2,

У меня проблемы с этой логикой передачи. Как мне перенести содержимое очереди из process1 в process2 (Я предполагаю, что это будет основной процесс или другой процесс? Это еще один вопрос, должен ли я создавать 2 процесса в дополнение к основному процессу?) В конце 60 секунд, а затем очищать содержимое очереди, чтобы он снова запускался другая итерация.

Пока у меня есть следующее:

import sys
from kafka.client import KafkaClient
from kafka import SimpleConsumer
import time
from multiprocessing import Process,Queue

def kafka_init():
    client=KafkaClient('kafka1.wpit.nile.works')
    consumer=SimpleConsumer(client, "druidkafkaconsumer", "personalization.targeting.clickstream.prod")
    return consumer

def consumeMessages(q):
    print "thread started"
    while not q.empty():
        try:
            print q.get(True,1)
        Queue.Empty:
            break
    print "thread ended"
if __name__=="__main__":
    starttime=time.time()
    timeout=starttime+ 10 #timeout of read in seconds
    consumer=kafka_init()
    q=Queue()
    p=Process(target=consumeMessages,args=q)
    while(True):
        q.put(consumer.get_message())
        if time.time()>timeout:
            #transfer logic from process1 to main process here.
            print "Start time",starttime
            print "End time",time.time()
            p.start()
            p.join()
            break

Любая помощь приветствуется.

1 ответ

Решение

Проблема, с которой вы сталкиваетесь, не является специфичной для kafka, поэтому я собираюсь использовать общие "сообщения", которые просто являются целочисленными.

Мне кажется, что основная проблема заключается в том, что, с одной стороны, вы хотите обрабатывать сообщения сразу после их создания, а с другой - обновлять базу данных каждые 60 секунд.

Если вы используете q.get()по умолчанию этот вызов метода блокируется до тех пор, пока в очереди не появится сообщение. Это может занять более 60 секунд, что приведет к задержке обновления базы данных. Таким образом, мы не можем использовать блокировку q.get, Нам нужно использовать q.get с таймаутом, чтобы звонок не блокировался:

import time
import multiprocessing as mp
import random
import Queue

def process_messages(q):
    messages = []
    start = time.time()
    while True:
        try:
            message = q.get(timeout=1)
        except Queue.Empty:
            pass
        else:
            messages.append(message)
            print('Doing data analysis on {}'.format(message))
        end = time.time()
        if end-start > 60:
            print('Updating database: {}'.format(messages))
            start = end
            messages = []

def get_messages(q):
    while True:
        time.sleep(random.uniform(0,5))
        message = random.randrange(100)
        q.put(message)

if __name__ == "__main__":
    q = mp.Queue()

    proc1 = mp.Process(target=get_messages, args=[q])
    proc1.start()

    proc2 = mp.Process(target=process_messages, args=[q])
    proc2.start()

    proc1.join()
    proc2.join()

производит вывод, такой как:

Doing data analysis on 38
Doing data analysis on 8
Doing data analysis on 8
Doing data analysis on 66
Doing data analysis on 37
Updating database: [38, 8, 8, 66, 37]
Doing data analysis on 27
Doing data analysis on 47
Doing data analysis on 57
Updating database: [27, 47, 57]
Doing data analysis on 85
Doing data analysis on 90
Doing data analysis on 86
Doing data analysis on 22
Updating database: [85, 90, 86, 22]
Doing data analysis on 8
Doing data analysis on 92
Doing data analysis on 59
Doing data analysis on 40
Updating database: [8, 92, 59, 40]
Другие вопросы по тегам