Многопроцессорная обработка на tee'd-генераторах

Рассмотрим следующий сценарий, в котором я тестирую два способа выполнения некоторых вычислений для генераторов, полученных itertools.tee:

#!/usr/bin/env python3

from sys import argv
from itertools import tee
from multiprocessing import Process

def my_generator():
    for i in range(5):
        print(i)
        yield i

def double(x):
    return 2 * x

def compute_double_sum(iterable):
    s = sum(map(double, iterable))
    print(s)

def square(x):
    return x * x

def compute_square_sum(iterable):
    s = sum(map(square, iterable))
    print(s)

g1, g2 = tee(my_generator(), 2)

try:
    processing_type = argv[1]
except IndexError:
    processing_type = "no_multi"

if processing_type == "multi":
    p1 = Process(target=compute_double_sum, args=(g1,))
    p2 = Process(target=compute_square_sum, args=(g2,))
    print("p1 starts")
    p1.start()
    print("p2 starts")
    p2.start()
    p1.join()
    print("p1 finished")
    p2.join()
    print("p2 finished")
else:
    compute_double_sum(g1)
    compute_square_sum(g2)

Вот что я получаю, когда запускаю скрипт в "нормальном" режиме:

$ ./test_tee.py 
0
1
2
3
4
20
30

И вот в параллельном режиме:

$ ./test_tee.py multi
p1 starts
p2 starts
0
1
2
3
4
20
0
1
2
3
4
30
p1 finished
p2 finished

Первоначальный генератор, видимо, как-то "копируется" и выполняется дважды.

Я хотел бы избежать этого, потому что в моем реальном приложении это, кажется, вызывает ошибку в одной из внешних библиотек, которые я использую для создания исходного генератора ( https://github.com/pysam-developers/pysam/issues/397), и при этом иметь возможность выполнять вычисления параллельно для тех же сгенерированных значений.

Есть ли способ добиться того, чего я хочу?

2 ответа

Решение

Я нашел альтернативный способ сделать это: /questions/27984539/primenit-umenshenie-na-vyihode-generatora-s-mnogoprotsessornostyu/27984546#27984546.

При таком подходе мы больше не используем генератор. Мы просто дублируем сгенерированные элементы и передаем их в составную функцию, которая выполняет параллельную обработку сгенерированных элементов только в рамках одного процесса, но мы используем многопроцессорность, используя Pool (это то, что называется подходом карты / сокращения?):

#!/usr/bin/env python3

from itertools import starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

def my_generator():
    for i in range(5):
        print(i)
        yield i

def double(x):
    return 2 * x

def square(x):
    return x * x

def double_and_square(args_list):
    return (double(*args_list[0]), square(*args_list[1]))

def sum_tuples(tup1, tup2):
    return tuple(starmap(add, zip(tup1, tup2)))

with Pool(processes=5) as pool:
    results_generator = pool.imap_unordered(double_and_square, (((arg,), (arg,)) for arg in my_generator()))

    print(reduce(sum_tuples, results_generator))

Это работает на игрушечном примере. Теперь я должен выяснить, как аналогичным образом организовать мои вычисления в реальном случае приложения.

Я попытался обобщить это, используя функцию более высокого порядка (make_funcs_applier) сгенерировать составную функцию (apply_funcs), но я получаю следующую ошибку:

AttributeError: Can't pickle local object  'make_funcs_applier.<locals>.apply_funcs'

Более обобщенная попытка

Основываясь на предложении в комментариях, я попытался улучшить приведенное выше решение, чтобы сделать его более пригодным для повторного использования:

#!/usr/bin/env python3
"""This script tries to work around some limitations of multiprocessing."""

from itertools import repeat, starmap
from multiprocessing import Pool
from functools import reduce
from operator import add

# Doesn't work because local functions can't be pickled:
# def make_tuple_func(funcs):
#     def tuple_func(args_list):
#         return tuple(func(args) for func, args in zip(funcs, args_list))
#     return tuple_func
#
# test_tuple_func = make_tuple_func((plus_one, double, square))

class FuncApplier(object):
    """This kind of object can be used to group functions and call them on a
    tuple of arguments."""
    __slots__ = ("funcs", )

    def __init__(self, funcs):
        self.funcs = funcs

    def __len__(self):
        return len(self.funcs)

    def __call__(self, args_list):
        return tuple(func(args) for func, args in zip(self.funcs, args_list))

    def fork_args(self, args_list):
        """Takes an arguments list and repeat them in a n-tuple."""
        return tuple(repeat(args_list, len(self)))


def sum_tuples(*tuples):
    """Element-wise sum of tuple items."""
    return tuple(starmap(add, zip(*tuples)))


# Can't define these functions in main:
# They wouldn't be pickleable.
def plus_one(x):
    return x + 1

def double(x):
    return 2 * x

def square(x):
    return x * x

def main():
    def my_generator():
        for i in range(5):
            print(i)
            yield i


    test_tuple_func = FuncApplier((plus_one, double, square))

    with Pool(processes=5) as pool:
        results_generator = pool.imap_unordered(
            test_tuple_func,
            (test_tuple_func.fork_args(args_list) for args_list in my_generator()))
        print("sum of x+1:\t%s\nsum of 2*x:\t%s\nsum of x*x:\t%s" % reduce(
            sum_tuples, results_generator))
    exit(0)

if __name__ == "__main__":
    exit(main())

Тестирование это:

$ ./test_fork.py 
0
1
2
3
4
sum of x+1: 15
sum of 2*x: 20
sum of x*x: 30

Для меня все еще есть некоторые раздражающие ограничения, потому что я часто определяю локальные функции в своем коде.

multiprocessing Система импортирует ваш основной модуль в каждый запускаемый процесс. Поэтому код модуля выполняется в каждом процессе.

Вы можете избежать этого, используя всегда рекомендуемые

if __name__ == '__main__':

после определения вашего класса и функции, поэтому код для основной программы выполняется только в начальном процессе. Предполагается, что это требование только для платформ Windows, но, возможно, стоит попробовать, поскольку вы жалуетесь на то, что код запускается дважды.

Другие вопросы по тегам