Макет аргумента функции map_async приводит к PicklingError

Попытка написать несколько юнит-тестов вокруг функции, которая выполняет map_async() операция. В частности, я хочу подтвердить, что некоторые файлы очищаются в случае возникновения исключения в одном из процессов. Пример псевдокода с намерениями, указанными ниже.

foo.py

def write_chunk(chunk):
    ... create file from chunk
    return created_filename

class Foo:
    def write_parallel(chunks):
        filenames = set()
        try:
            pool = Pool(processes=2)
            pool.map_async(write_chunk, chunks, callback=filenames.add)
        except Exception:
            //handle exception
        finally:
            cleanup_files(filenames)

test_foo.py

@patch("foo.write_chunk")
def test_write_parallel_exception_cleanup(self, mock_write_chunk):
    def mock_side_effect(chunk):
        if "chunk_1" == chunk:
            raise Exception
        else:
            return chunk
    mock_write_chunk.side_effect = mock_side_effect

    foo = Foo()
    foo.write_parallel({"chunk_1", "chunk_2"})
    //assert "chunk_2" cleaned up and exception is thrown.

Однако, когда я иду для выполнения теста, я получаю следующую ошибку PicklingError: PicklingError: Can't pickle <class 'mock.MagicMock'>: it's not the same object as mock.MagicMock,

Любые идеи, как выполнить желаемый результат замены сопоставленной функции моей собственной фиктивной функцией?

2 ответа

Таким образом, поскольку проблема возникла из-за попыток Mock and Pickle функции, я решил вывести функциональность в отдельную функцию, смоделировать эту функцию, одновременно позволяя выбирать исходную функцию. Увидеть ниже:

foo.py

def write_chunk(chunk):
    return write_chunk_wrapped(chunk)

def write_chunk_wrapped(chunk)
    ... create file from chunk
    return created_filename

class Foo:
    def write_parallel(chunks):
        filenames = set()
        try:
            pool = Pool(processes=2)
            pool.map_async(write_chunk, chunks, callback=filenames.add)
        except Exception:
            //handle exception
        finally:
            cleanup_files(filenames)

test_foo.py

@patch("foo.write_chunk_wrapped")
def test_write_parallel_exception_cleanup(self, mock_write_chunk_wrapped):
    def mock_side_effect(chunk):
        if "chunk_1" == chunk:
            raise Exception
        else:
            return chunk
    mock_write_chunk_wrapped.side_effect = mock_side_effect

    foo = Foo()
    foo.write_parallel({"chunk_1", "chunk_2"})
    //assert "chunk_2" cleaned up and exception is thrown.

Вы можете издеваться над map_async() и создать для него фикстуру с помощью pytest. Таким образом, вы будете выполнять код синхронно для целей тестирования, позволяя исправление/насмешку и избегая ошибки травления.

      class MockPoolMapAsyncResult:
    """
    Mock for the `multiprocessing.pool.Pool.map_async` method.
    This mock executes the code in a syncronous way and enables the 
    usage of patches and mocks within the tests
    that run multiprocessing code. It also avoids errors with pickling.
    """
    def __init__(self, func, args):
        self._func = func
        self._args = args

    def get(self, timeout=0):
        result = [self._func(args) for args in self._args]
        return result


@fixture(autouse=True)
def mock_pool_map_async(monkeypatch):
    monkeypatch.setattr("multiprocessing.pool.Pool.map_async",
                        lambda self, func, args=():
                        MockPoolMapAsyncResult(func, args))

Источник: https://gist.github.com/antobarbero/de3bfd6e68672a5b305854d8c9e8cb5c .

Другие вопросы по тегам