Пул потоков, похожий на пул многопроцессорных?

Существует ли класс Pool для рабочих потоков, аналогичный классу пула многопроцессорного модуля?

Мне нравится, например, простой способ распараллелить функцию карты

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

Однако я хотел бы сделать это без накладных расходов на создание новых процессов.

Я знаю о GIL. Однако в моем случае использования эта функция будет функцией C, связанной с вводом-выводом, для которой оболочка Python выпустит GIL перед фактическим вызовом функции.

Должен ли я написать свой собственный пул потоков?

9 ответов

Решение

Я только что узнал, что на самом деле есть интерфейс пула на основе потоков в multiprocessing модуль, однако он несколько скрыт и не документирован надлежащим образом.

Это может быть импортировано через

from multiprocessing.pool import ThreadPool

Он реализован с использованием фиктивного класса Process, обертывающего поток Python. Этот класс процессов на основе потоков можно найти в multiprocessing.dummy который кратко упоминается в документах. Этот фиктивный модуль предположительно обеспечивает весь многопроцессорный интерфейс на основе потоков.

В Python 3 вы можете использовать concurrent.futures.ThreadPoolExecutor т.е.

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

Смотрите документы для получения дополнительной информации и примеров.

Да, и, кажется, (более или менее) тот же API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....

Для чего-то очень простого и легкого (немного измененного отсюда):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

Для поддержки обратных вызовов при завершении задачи вы можете просто добавить обратный вызов в кортеж задачи.

Привет, чтобы использовать пул потоков в Python, вы можете использовать эту библиотеку:

from multiprocessing.dummy import Pool as ThreadPool

и затем для использования, эта библиотека делает так:

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

Потоки - это количество потоков, которые вы хотите, а задачи - это список задач, которые больше всего соответствуют сервису.

Да, существует пул потоков, аналогичный пулу многопроцессорности, однако он несколько скрыт и не документирован должным образом. Вы можете импортировать его следующим образом:-

from multiprocessing.pool import ThreadPool

Просто покажу вам простой пример

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result) 

Вот результат, который я наконец-то использовал. Это модифицированная версия классов от dgorissen выше.

Файл: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

Использовать бассейн

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

Вот что выглядит многообещающе в Поваренной книге Python:

Рецепт 576519: Пул потоков с тем же API, что и (мульти) обработка. Бассейн (Python)

Другим способом может быть добавление процесса в пул очереди потоков

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

Затраты на создание новых процессов минимальны, особенно когда их всего 4. Я сомневаюсь, что это горячая точка производительности вашего приложения. Сохраняйте это простым, оптимизируйте, где вам нужно и куда указывают результаты профилирования.

Нет встроенного пула на основе потоков. Однако может быть очень быстро реализовать очередь производителя / потребителя с Queue учебный класс.

От: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

Если вы не против выполнить чужой код, вот мой:

Примечание. Есть много лишнего кода, который вы, возможно, захотите удалить [добавлен для лучшего понимания и демонстрации того, как это работает]

Примечание. Для имен методов и имен переменных использовались соглашения об именах Python вместо camelCase.

Порядок работы:

  1. Класс MultiThread будет запускаться без экземпляров потоков, разделяя блокировку, рабочую очередь, флаг выхода и результаты.
  2. SingleThread будет запущен MultiThread после создания всех экземпляров.
  3. Мы можем добавлять работы с помощью MultiThread (он позаботится о блокировке).
  4. SingleThreads будет обрабатывать рабочую очередь, используя блокировку посередине.
  5. Как только ваша работа будет сделана, вы можете уничтожить все потоки с общим логическим значением.
  6. Здесь работа может быть чем угодно. Он может автоматически импортировать (раскомментировать строку импорта) и обрабатывать модуль, используя заданные аргументы.
  7. Результаты будут добавлены к результатам, и мы сможем получить их с помощью get_results

Код:

import threading
import queue


class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results

    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)

class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)

    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()

    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()

    def get_results(self):
        return self.results


class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs


class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])

    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)


class bool_instance:
    def __init__(self, value=False):
        self.value = value

    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))

    def __bool__(self):
        return self.value

if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())
Другие вопросы по тегам