Моя очередь HelloWorld работает?
Я собираюсь использовать этот дизайн в приложении, но я довольно плохо знаком с многопоточностью и очередями в Python. Очевидно, что реальное приложение не для того, чтобы сказать "привет", но дизайн тот же - то есть есть процесс, который требует некоторого времени на настройку и демонтаж, но я могу выполнить несколько задач за один удар. Задачи будут приходить в случайные моменты времени и часто в виде очередей.
Это разумный и безопасный дизайн?
class HelloThing(object):
def __init__(self):
self.queue = self._create_worker()
def _create_worker(self):
import threading, Queue
def worker():
while True:
things = [q.get()]
while True:
try:
things.append(q.get_nowait())
except Queue.Empty:
break
self._say_hello(things)
[q.task_done() for task in xrange(len(things))]
q = Queue.Queue()
n_worker_threads = 1
for i in xrange(n_worker_threads):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
return q
def _say_hello(self, greeting_list):
import time, sys
# setup stuff
time.sleep(1)
# do some things
sys.stdout.write('hello {0}!\n'.format(', '.join(greeting_list)))
# tear down stuff
time.sleep(1)
if __name__ == '__main__':
print 'enter __main__'
import time
hello = HelloThing()
hello.queue.put('world')
hello.queue.put('cruel world')
hello.queue.put('stack overflow')
time.sleep(2)
hello.queue.put('a')
hello.queue.put('b')
time.sleep(2)
for i in xrange(20):
hello.queue.put(str(i))
#hello.queue.join()
print 'finish __main__'
1 ответ
Безопасность потока обрабатывается реализацией очереди (также вы должны обращаться с ней в своей реализации _say_hello, если это требуется).
Проблема обработчика пакета: Пакет должен обрабатываться только одним потоком (например: скажем, установка / разрыв вашего процесса занимает 10 секунд; в секунду 1 все потоки будут заняты пакетом со секунды 0, во второй 5 - новая задача (или взрыв) но нет потока, чтобы справиться с ними / это). Таким образом, пакет должен быть определен максимальным количеством задач (или может быть "бесконечным") для определенного временного окна. Запись в очереди должна быть списком задач.
Как вы можете группировать список задач? Я предоставляю решение в виде кода, более легкого для объяснения...
producer_q = Queue()
def _burst_thread():
while True:
available_tasks = [producer_q.get()]
time.sleep(BURST_TIME_WINDOW)
available_tasks.extend(producer_q.get() # I'm the single consumer, so will be at least qsize elements
for i in range(producer_q.qsize()))
consumer_q.push(available_tasks)
Если вы хотите иметь максимум сообщений в пакете, вам просто нужно разделить available_tasks в нескольких списках.