Python ThreadPool с ограниченным размером очереди задач

Моя проблема заключается в следующем: у меня есть multiprocessing.pool.ThreadPool объект с worker_count рабочие и основные pqueue из которого я кормлю задачи в пул.

Процесс выглядит следующим образом: есть основной цикл, который получает элемент level уровень от pqueue и отправляет его в пул с помощью apply_async, Когда элемент обрабатывается, он генерирует элементы level + 1, Проблема в том, что пул принимает все задачи и обрабатывает их в порядке их отправки.

Точнее, происходит то, что level 0 элементы обрабатываются и каждый генерирует 100 level 1 элементы, которые немедленно получены из pqueue и добавили в пул, каждый level 1 пункт производит 100 level 2 элементы, которые передаются в пул и т. д., и элементы обрабатываются в BFS-режиме.

Мне нужно сказать пулу не принимать больше, чем worker_count предметы, чтобы дать возможность получить более высокий уровень из pqueue для обработки элементов в манере DFS.

Текущее решение, с которым я пришел: для каждой отправленной задачи сохраните AsyncResult объект в asyncres_list список, а также перед извлечением элементов из pqueue Я удаляю элементы, которые были обработаны (если есть), проверяю длину asyncres_list меньше, чем количество потоков в пуле каждые 0,5 секунды, и только так thread_number элементы будут обработаны в то же время.

Мне интересно, есть ли более чистый способ достижения такого поведения, и я не могу найти в документации некоторые параметры для ограничения максимального количества задач, которые могут быть переданы в пул.

2 ответа

ThreadPool это простой инструмент для общей задачи. Если вы хотите сами управлять очередью, чтобы получить поведение DFS; Вы могли бы реализовать необходимую функциональность на вершине threading а также queue модули напрямую.

Чтобы предотвратить планирование следующей корневой задачи до тех пор, пока все задачи, порожденные текущей задачей, не будут выполнены (в порядке "DFS"), вы можете использоватьQueue.join():

#!/usr/bin/env python3
import queue
import random
import threading
import time

def worker(q, multiplicity=5, maxlevel=3, lock=threading.Lock()):
    for task in iter(q.get, None):  # blocking get until None is received
        try:
            if len(task) < maxlevel:
                for i in range(multiplicity):
                    q.put(task + str(i))  # schedule the next level
            time.sleep(random.random())  # emulate some work
            with lock:
                print(task)
        finally:
            q.task_done()

worker_count = 2
q = queue.LifoQueue()
threads = [threading.Thread(target=worker, args=[q], daemon=True)
           for _ in range(worker_count)]
for t in threads:
    t.start()

for task in "01234":  # populate the first level
    q.put(task)
    q.join()  # block until all spawned tasks are done
for _ in threads:  # signal workers to quit
    q.put(None)
for t in threads:  # wait until workers exit
    t.join()

Пример кода получен из примера в queue модульная документация.

Задача на каждом уровне порождает multiplicity прямые дочерние задачи, которые порождают свои собственные подзадачи до maxlevel достигнуто

None используется, чтобы сигнализировать рабочим, что они должны уйти. t.join() используется, чтобы ждать, пока потоки не выйдут изящно. Если основной поток прерывается по какой-либо причине, то потоки демона уничтожаются, если нет других потоков, не являющихся демонами (возможно, вы захотите предоставить обработчик SIGINT, чтобы сигнализировать рабочим о корректном завершении работы Ctrl+C вместо того чтобы просто умереть).

queue.LifoQueue() используется для получения порядка "Последний пришел первым вышел" (он является приблизительным из-за нескольких потоков).

maxsize не установлен, потому что в противном случае рабочие могут зайти в тупик - вы все равно должны поставить задачу куда-нибудь. worker_count фоновые потоки работают независимо от очереди задач.

Это другое решение хорошо. Если вам нужен более «простой» параллелизм, особенно при попытке использовать многозадачные интерфейсы блокировки (например,requests), модуль concurrent встроен и может иметь то, что вы хотите:

      import concurrent.futures

def worker(num):
    print(f'in worker with {num}')
    return num*2

with concurrent.futures.ThreadPoolExecutor(max_workers=16) as executor:
    for future in concurrent.futures.as_completed([executor.submit(worker, i) for i in range(10000)]):
        print(future.result())
Другие вопросы по тегам