Python ThreadPool с ограниченным размером очереди задач
Моя проблема заключается в следующем: у меня есть multiprocessing.pool.ThreadPool
объект с worker_count
рабочие и основные pqueue
из которого я кормлю задачи в пул.
Процесс выглядит следующим образом: есть основной цикл, который получает элемент level
уровень от pqueue
и отправляет его в пул с помощью apply_async
, Когда элемент обрабатывается, он генерирует элементы level + 1
, Проблема в том, что пул принимает все задачи и обрабатывает их в порядке их отправки.
Точнее, происходит то, что level 0
элементы обрабатываются и каждый генерирует 100 level 1
элементы, которые немедленно получены из pqueue
и добавили в пул, каждый level 1
пункт производит 100 level 2
элементы, которые передаются в пул и т. д., и элементы обрабатываются в BFS-режиме.
Мне нужно сказать пулу не принимать больше, чем worker_count
предметы, чтобы дать возможность получить более высокий уровень из pqueue
для обработки элементов в манере DFS.
Текущее решение, с которым я пришел: для каждой отправленной задачи сохраните AsyncResult
объект в asyncres_list
список, а также перед извлечением элементов из pqueue
Я удаляю элементы, которые были обработаны (если есть), проверяю длину asyncres_list
меньше, чем количество потоков в пуле каждые 0,5 секунды, и только так thread_number
элементы будут обработаны в то же время.
Мне интересно, есть ли более чистый способ достижения такого поведения, и я не могу найти в документации некоторые параметры для ограничения максимального количества задач, которые могут быть переданы в пул.
2 ответа
ThreadPool
это простой инструмент для общей задачи. Если вы хотите сами управлять очередью, чтобы получить поведение DFS; Вы могли бы реализовать необходимую функциональность на вершине threading
а также queue
модули напрямую.
Чтобы предотвратить планирование следующей корневой задачи до тех пор, пока все задачи, порожденные текущей задачей, не будут выполнены (в порядке "DFS"), вы можете использоватьQueue.join()
:
#!/usr/bin/env python3
import queue
import random
import threading
import time
def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
for task in iter(q.get, None): # blocking get until None is received
try:
if len(task) < maxlevel:
for i in range(multiplicity):
q.put(task + str(i)) # schedule the next level
time.sleep(random.random()) # emulate some work
with lock:
print(task)
finally:
q.task_done()
worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
for _ in range(worker_count)]
for t in threads:
t.start()
for task in "01234": # populate the first level
q.put(task)
q.join() # block until all spawned tasks are done
for _ in threads: # signal workers to quit
q.put(None)
for t in threads: # wait until workers exit
t.join()
Пример кода получен из примера в queue
модульная документация.
Задача на каждом уровне порождает multiplicity
прямые дочерние задачи, которые порождают свои собственные подзадачи до maxlevel
достигнуто
None
используется, чтобы сигнализировать рабочим, что они должны уйти. t.join()
используется, чтобы ждать, пока потоки не выйдут изящно. Если основной поток прерывается по какой-либо причине, то потоки демона уничтожаются, если нет других потоков, не являющихся демонами (возможно, вы захотите предоставить обработчик SIGINT, чтобы сигнализировать рабочим о корректном завершении работы Ctrl+C
вместо того чтобы просто умереть).
queue.LifoQueue()
используется для получения порядка "Последний пришел первым вышел" (он является приблизительным из-за нескольких потоков).
maxsize
не установлен, потому что в противном случае рабочие могут зайти в тупик - вы все равно должны поставить задачу куда-нибудь. worker_count
фоновые потоки работают независимо от очереди задач.
Это другое решение хорошо. Если вам нужен более «простой» параллелизм, особенно при попытке использовать многозадачные интерфейсы блокировки (например,requests
), модуль concurrent встроен и может иметь то, что вы хотите:
import concurrent.futures
def worker(num):
print(f'in worker with {num}')
return num*2
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
for future in concurrent.futures.as_completed([executor.submit(worker, i) for i in range(10000)]):
print(future.result())