Поддерживает ли pytest multiprocessing.set_start_method?

Документ о multiprocessing.set_start_method Обратите внимание, что:

Обратите внимание, что это следует вызывать не более одного раза, и оно должно быть защищено внутри предложения if name == 'main' основного модуля.

Однако, если я поставлю multiprocessing.set_start_method('spawn') Я не знаю, будет ли он работать в модуле Pytest.

2 ответа

Если вы действительно хотите всегда использовать один и тот же метод запуска, вы можете установить его в фикстуре сеанса в файлеconftest.pyв корне вашего исходного дерева. Например

      # conftest.py
import multiprocessing
import pytest

@pytest.fixture(scope="session", autouse=True)
def always_spawn():
    multiprocessing.set_start_method("spawn")

Действительно, как указано в документации, у вас будут проблемы, если вы попытаетесь позвонить multiprocessing.set_start_method() из нескольких функций модульных тестов. Более того, это повлияет на всю вашу программу и может плохо взаимодействовать со всем набором тестов.

Однако существует обходной путь, который также описан в документации:

В качестве альтернативы вы можете использовать get_context() для получения объекта контекста. Объекты контекста имеют тот же API, что и модуль многопроцессорности, и позволяют использовать несколько методов запуска в одной программе.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join() ```

Этот метод можно использовать для каждого теста, чтобы избежать обсуждаемых проблем совместимости. Его можно комбинировать с "monkeypatching" или "mocking" для тестирования вашего класса с различными методами запуска:

# my_class.py

import multiprocessing

class MyClass:

    def __init__(self):
        self._queue = multiprocessing.Queue()

    def process(self, x):
        # Very simplified example of a method using a multiprocessing Queue 
        self._queue.put(x)
        return self._queue.get()
# tests/test_my_class.py

import multiprocessing
import my_class

def test_spawn(monkeypatch):
    ctx = multiprocessing.get_context('spawn')
    monkeypatch.setattr(my_class.multiprocessing, "Queue", ctx.Queue)
    obj = my_class.MyClass()
    assert obj.process(6) == 6

def test_fork(monkeypatch):
    ctx = multiprocessing.get_context('fork')
    monkeypatch.setattr(my_class.multiprocessing, "Queue", ctx.Queue)
    obj = my_class.MyClass()
    assert obj.process(6) == 6
Другие вопросы по тегам