Ошибка многопроцессорной обработки Python

Я сожалею, что не могу воспроизвести ошибку на более простом примере, и мой код слишком сложен для публикации. Если я запускаю программу в оболочке IPython вместо обычного python, все работает хорошо.

Я посмотрел несколько предыдущих заметок по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в функции класса. Но это не так для меня.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Буду признателен за любую помощь.

ОБНОВЛЕНИЕ: Функция, которую я выбираю, определяется на верхнем уровне модуля. Хотя он вызывает функцию, которая содержит вложенную функцию. то есть f () вызывает g (), вызывает h (), которая имеет вложенную функцию i (), а я вызываю pool.apply_async (f). f (), g (), h () все определены на верхнем уровне. Я попробовал более простой пример с этим шаблоном, и он работает, хотя.

12 ответов

Решение

Вот список того, что можно мариновать. В частности, функции можно выбирать только в том случае, если они определены на верхнем уровне модуля.

Этот кусок кода:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

выдает ошибку, почти идентичную той, которую вы опубликовали:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Проблема в том, что pool все методы используют queue.Queue передавать задачи рабочим процессам. Все, что проходит через queue.Queue должен быть отборным, и foo.work не может быть выбран, так как он не определен на верхнем уровне модуля.

Это можно исправить, определив функцию на верхнем уровне, которая вызывает foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Заметить, что foo выбирается, так как Foo определяется на верхнем уровне и foo.__dict__ маринованные

Я бы использовал pathos.multiprocesssing, вместо multiprocessing, pathos.multiprocessing это вилка multiprocessing который использует dill, dill Вы можете сериализовать почти все в Python, так что вы можете отправлять намного больше параллельно. pathos У fork также есть возможность работать напрямую с несколькими аргументными функциями, как вам нужно для методов класса.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Получить pathos (и если хотите, dill) здесь: https://github.com/uqfoundation

Когда возникает эта проблема multiprocessing простое решение - переключиться с Pool к ThreadPool. Это можно сделать без изменения кода, кроме импорта-

from multiprocessing.pool import ThreadPool as Pool

Это работает, потому что ThreadPool разделяет память с основным потоком, а не создает новый процесс - это означает, что травление не требуется.

Обратной стороной этого метода является то, что python не лучший язык для обработки потоков - он использует так называемую глобальную блокировку интерпретатора, чтобы оставаться потокобезопасным, что может замедлить некоторые варианты использования. Однако, если вы в основном взаимодействуете с другими системами (выполняете HTTP-команды, общаетесь с базой данных, записываете в файловые системы), ваш код, скорее всего, не привязан к процессору и не сильно пострадает. Фактически, при написании тестов HTTP/HTTPS я обнаружил, что используемая здесь потоковая модель имеет меньше накладных расходов и задержек, так как накладные расходы на создание новых процессов намного выше, чем накладные расходы на создание новых потоков.

Так что, если вы обрабатываете массу вещей в пользовательском пространстве Python, это может быть не лучший метод.

Как уже говорили другие multiprocessing может передавать только объекты Python рабочим процессам, которые можно обрабатывать. Если вы не можете реорганизовать свой код, как описано в unutbu, вы можете использовать dills расширенные возможности выбора / удаления для передачи данных (особенно данных кода), как я покажу ниже.

Это решение требует только установки dill и никаких других библиотек pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

Я обнаружил, что могу также сгенерировать именно эту ошибку на идеально работающем фрагменте кода, пытаясь использовать на нем профилировщик.

Обратите внимание, что это было в Windows (где разветвление немного менее элегантно).

Я бегал:

python -m profile -o output.pstats <script> 

И обнаружил, что удаление профилирования удалило ошибку, а размещение профилирования восстановило ее. Сводил меня с ума, потому что я знал, что код работал. Я проверял, не обновлял ли что-то файл pool.py... затем у меня возникло чувство опускания, и я устранил профилирование, вот и все.

Публикация здесь для архивов на случай, если кто-то еще столкнется с ним.

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Эта ошибка также возникнет, если у вас есть встроенная функция внутри объекта модели, которая была передана в асинхронное задание.

Поэтому убедитесь, что переданные объекты модели не имеют встроенных функций. (В нашем случае мы использовали FieldTracker() функция django-model-utils внутри модели для отслеживания определенного поля). Вот ссылка на соответствующий вопрос GitHub.

Быстрое исправление - сделать функцию глобальной

      from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

Это решение требует только установки укропа и никаких других библиотек, как пафос

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

Это также работает для NumPy массивов.

Вы случайно не передаете массив строк?

У меня была точно такая же ошибка, когда я передаю массив, который содержит пустую строку. Я думаю, что это может быть связано с этой ошибкой: http://projects.scipy.org/numpy/ticket/1658

Основываясь на решении @rocksportrocker, имеет смысл укропить при отправке и получении результатов.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

Как предложил @penky Suresh в этом ответе , не используйте встроенные ключевые слова.

Видимо args является встроенным ключевым словом при работе с многопроцессорностью

      
with ProcessPoolExecutor(max_workers=10) as executor:
# Using args here is fine. 
            future_processes = {
                executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                    print(f"Generated data for comment process: {future}")


# Dont use 'args' here. It seems to be a built-in keyword.
# Changing 'args' to 'arg' worked for me.
def process_and_render_item(arg):
        print(arg)


У меня была та же проблема и много других с многопроцессорностью. Я решил поместить то, что я изучил, в небольшой скрипт на языке Python с открытым исходным кодом, который я назвал многопроцессорной для детей. Я думаю, что это делает использование многопроцессорности действительно простым. Вы можете найти его на GitHub:

https://github.com/predictedblog/multiprocessing_for_kids.

Я также написал 2 сообщения в блоге с примерами того, как его использовать:

https://predicted.blog/multiprocessing-for-kids/

https://predicted.blog/multiprocessing-for-kids-shared-variables/

Вы используете функцию doMultiprocessingLoop(yourFunction, Iterator) для запуска yourFunction в нескольких процессах.

Я просто хочу помочь людям, которые сталкиваются с одними и теми же проблемами, используя многопроцессорность снова и снова, как я. Это работает для многих простых случаев использования, таких как разделение переменных между процессами и возврат значений из них. Возможно даже прекращение всех процессов путем возврата результата. Пожалуйста, прочитайте упомянутые сообщения в блоге для получения дополнительной информации. Прочитав их, вы, по крайней мере, лучше поймете, как работает многопроцессорная среда и где существуют ограничения.

Не стесняйтесь редактировать скрипт, если вы хотите добавить функциональность. Тяговые запросы также приветствуются. Если вы хотите сделать что-то большее из этого, не стесняйтесь связаться со мной, и я дам вам разрешения администратора.

Другие вопросы по тегам