Пул потоков, похожий на пул многопроцессорных?
Существует ли класс Pool для рабочих потоков, аналогичный классу пула многопроцессорного модуля?
Мне нравится, например, простой способ распараллелить функцию карты
def long_running_func(p):
c_func_no_gil(p)
p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))
Однако я хотел бы сделать это без накладных расходов на создание новых процессов.
Я знаю о GIL. Однако в моем случае использования эта функция будет функцией C, связанной с вводом-выводом, для которой оболочка Python выпустит GIL перед фактическим вызовом функции.
Должен ли я написать свой собственный пул потоков?
9 ответов
Я только что узнал, что на самом деле есть интерфейс пула на основе потоков в multiprocessing
модуль, однако он несколько скрыт и не документирован надлежащим образом.
Это может быть импортировано через
from multiprocessing.pool import ThreadPool
Он реализован с использованием фиктивного класса Process, обертывающего поток Python. Этот класс процессов на основе потоков можно найти в multiprocessing.dummy
который кратко упоминается в документах. Этот фиктивный модуль предположительно обеспечивает весь многопроцессорный интерфейс на основе потоков.
В Python 3 вы можете использовать concurrent.futures.ThreadPoolExecutor
т.е.
executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)
Смотрите документы для получения дополнительной информации и примеров.
Да, и, кажется, (более или менее) тот же API.
import multiprocessing
def worker(lnk):
....
def start_process():
.....
....
if(PROCESS):
pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE,
initializer=start_process)
pool.map(worker, inputs)
....
Для чего-то очень простого и легкого (немного измененного отсюда):
from Queue import Queue
from threading import Thread
class Worker(Thread):
"""Thread executing tasks from a given tasks queue"""
def __init__(self, tasks):
Thread.__init__(self)
self.tasks = tasks
self.daemon = True
self.start()
def run(self):
while True:
func, args, kargs = self.tasks.get()
try:
func(*args, **kargs)
except Exception, e:
print e
finally:
self.tasks.task_done()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads):
self.tasks = Queue(num_threads)
for _ in range(num_threads):
Worker(self.tasks)
def add_task(self, func, *args, **kargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kargs))
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
if __name__ == '__main__':
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(100)]
def wait_delay(d):
print 'sleeping for (%d)sec' % d
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Для поддержки обратных вызовов при завершении задачи вы можете просто добавить обратный вызов в кортеж задачи.
Привет, чтобы использовать пул потоков в Python, вы можете использовать эту библиотеку:
from multiprocessing.dummy import Pool as ThreadPool
и затем для использования, эта библиотека делает так:
pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results
Потоки - это количество потоков, которые вы хотите, а задачи - это список задач, которые больше всего соответствуют сервису.
Да, существует пул потоков, аналогичный пулу многопроцессорности, однако он несколько скрыт и не документирован должным образом. Вы можете импортировать его следующим образом:-
from multiprocessing.pool import ThreadPool
Просто покажу вам простой пример
def test_multithread_stringio_read_csv(self):
# see gh-11786
max_row_range = 10000
num_files = 100
bytes_to_df = [
'\n'.join(
['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
).encode() for j in range(num_files)]
files = [BytesIO(b) for b in bytes_to_df]
# read all files in many threads
pool = ThreadPool(8)
results = pool.map(self.read_csv, files)
first_result = results[0]
for result in results:
tm.assert_frame_equal(first_result, result)
Вот результат, который я наконец-то использовал. Это модифицированная версия классов от dgorissen выше.
Файл: threadpool.py
from queue import Queue, Empty
import threading
from threading import Thread
class Worker(Thread):
_TIMEOUT = 2
""" Thread executing tasks from a given tasks queue. Thread is signalable,
to exit
"""
def __init__(self, tasks, th_num):
Thread.__init__(self)
self.tasks = tasks
self.daemon, self.th_num = True, th_num
self.done = threading.Event()
self.start()
def run(self):
while not self.done.is_set():
try:
func, args, kwargs = self.tasks.get(block=True,
timeout=self._TIMEOUT)
try:
func(*args, **kwargs)
except Exception as e:
print(e)
finally:
self.tasks.task_done()
except Empty as e:
pass
return
def signal_exit(self):
""" Signal to thread to exit """
self.done.set()
class ThreadPool:
"""Pool of threads consuming tasks from a queue"""
def __init__(self, num_threads, tasks=[]):
self.tasks = Queue(num_threads)
self.workers = []
self.done = False
self._init_workers(num_threads)
for task in tasks:
self.tasks.put(task)
def _init_workers(self, num_threads):
for i in range(num_threads):
self.workers.append(Worker(self.tasks, i))
def add_task(self, func, *args, **kwargs):
"""Add a task to the queue"""
self.tasks.put((func, args, kwargs))
def _close_all_threads(self):
""" Signal all threads to exit and lose the references to them """
for workr in self.workers:
workr.signal_exit()
self.workers = []
def wait_completion(self):
"""Wait for completion of all the tasks in the queue"""
self.tasks.join()
def __del__(self):
self._close_all_threads()
def create_task(func, *args, **kwargs):
return (func, args, kwargs)
Использовать бассейн
from random import randrange
from time import sleep
delays = [randrange(1, 10) for i in range(30)]
def wait_delay(d):
print('sleeping for (%d)sec' % d)
sleep(d)
pool = ThreadPool(20)
for i, d in enumerate(delays):
pool.add_task(wait_delay, d)
pool.wait_completion()
Вот что выглядит многообещающе в Поваренной книге Python:
Рецепт 576519: Пул потоков с тем же API, что и (мульти) обработка. Бассейн (Python)
Другим способом может быть добавление процесса в пул очереди потоков
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
for i in range(10):
a = executor.submit(arg1, arg2,....)
Затраты на создание новых процессов минимальны, особенно когда их всего 4. Я сомневаюсь, что это горячая точка производительности вашего приложения. Сохраняйте это простым, оптимизируйте, где вам нужно и куда указывают результаты профилирования.
Нет встроенного пула на основе потоков. Однако может быть очень быстро реализовать очередь производителя / потребителя с Queue
учебный класс.
От: https://docs.python.org/2/library/queue.html
from threading import Thread
from Queue import Queue
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
q = Queue()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
Если вы не против выполнить чужой код, вот мой:
Примечание. Есть много лишнего кода, который вы, возможно, захотите удалить [добавлен для лучшего понимания и демонстрации того, как это работает]
Примечание. Для имен методов и имен переменных использовались соглашения об именах Python вместо camelCase.
Порядок работы:
- Класс MultiThread будет запускаться без экземпляров потоков, разделяя блокировку, рабочую очередь, флаг выхода и результаты.
- SingleThread будет запущен MultiThread после создания всех экземпляров.
- Мы можем добавлять работы с помощью MultiThread (он позаботится о блокировке).
- SingleThreads будет обрабатывать рабочую очередь, используя блокировку посередине.
- Как только ваша работа будет сделана, вы можете уничтожить все потоки с общим логическим значением.
- Здесь работа может быть чем угодно. Он может автоматически импортировать (раскомментировать строку импорта) и обрабатывать модуль, используя заданные аргументы.
- Результаты будут добавлены к результатам, и мы сможем получить их с помощью get_results
Код:
import threading
import queue
class SingleThread(threading.Thread):
def __init__(self, name, work_queue, lock, exit_flag, results):
threading.Thread.__init__(self)
self.name = name
self.work_queue = work_queue
self.lock = lock
self.exit_flag = exit_flag
self.results = results
def run(self):
# print("Coming %s with parameters %s", self.name, self.exit_flag)
while not self.exit_flag:
# print(self.exit_flag)
self.lock.acquire()
if not self.work_queue.empty():
work = self.work_queue.get()
module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
self.lock.release()
print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
# module = __import__(module_name)
result = str(getattr(module, operation)(*args, **kwargs))
print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
self.results.append(result)
else:
self.lock.release()
# process_work_queue(self.work_queue)
class MultiThread:
def __init__(self, no_of_threads):
self.exit_flag = bool_instance()
self.queue_lock = threading.Lock()
self.threads = []
self.work_queue = queue.Queue()
self.results = []
for index in range(0, no_of_threads):
thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
thread.start()
self.threads.append(thread)
def add_work(self, work):
self.queue_lock.acquire()
self.work_queue._put(work)
self.queue_lock.release()
def destroy(self):
self.exit_flag.value = True
for thread in self.threads:
thread.join()
def get_results(self):
return self.results
class Work:
def __init__(self, module, operation, args, kwargs={}):
self.module = module
self.operation = operation
self.args = args
self.kwargs = kwargs
class SimpleOperations:
def sum(self, *args):
return sum([int(arg) for arg in args])
@staticmethod
def mul(a, b, c=0):
return int(a) * int(b) + int(c)
class bool_instance:
def __init__(self, value=False):
self.value = value
def __setattr__(self, key, value):
if key != "value":
raise AttributeError("Only value can be set!")
if not isinstance(value, bool):
raise AttributeError("Only True/False can be set!")
self.__dict__[key] = value
# super.__setattr__(key, bool(value))
def __bool__(self):
return self.value
if __name__ == "__main__":
multi_thread = MultiThread(5)
multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
while True:
data_input = input()
if data_input == "":
pass
elif data_input == "break":
break
else:
work = data_input.split()
multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
multi_thread.destroy()
print(multi_thread.get_results())