Поддерживает ли pytest multiprocessing.set_start_method?
Документ о multiprocessing.set_start_method
Обратите внимание, что:
Обратите внимание, что это следует вызывать не более одного раза, и оно должно быть защищено внутри предложения if name == 'main' основного модуля.
Однако, если я поставлю multiprocessing.set_start_method('spawn')
Я не знаю, будет ли он работать в модуле Pytest.
2 ответа
Если вы действительно хотите всегда использовать один и тот же метод запуска, вы можете установить его в фикстуре сеанса в файлеconftest.py
в корне вашего исходного дерева. Например
# conftest.py
import multiprocessing
import pytest
@pytest.fixture(scope="session", autouse=True)
def always_spawn():
multiprocessing.set_start_method("spawn")
Действительно, как указано в документации, у вас будут проблемы, если вы попытаетесь позвонить multiprocessing.set_start_method()
из нескольких функций модульных тестов. Более того, это повлияет на всю вашу программу и может плохо взаимодействовать со всем набором тестов.
Однако существует обходной путь, который также описан в документации:
В качестве альтернативы вы можете использовать
get_context()
для получения объекта контекста. Объекты контекста имеют тот же API, что и модуль многопроцессорности, и позволяют использовать несколько методов запуска в одной программе.import multiprocessing as mp def foo(q): q.put('hello') if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,)) p.start() print(q.get()) p.join() ```
Этот метод можно использовать для каждого теста, чтобы избежать обсуждаемых проблем совместимости. Его можно комбинировать с "monkeypatching" или "mocking" для тестирования вашего класса с различными методами запуска:
# my_class.py
import multiprocessing
class MyClass:
def __init__(self):
self._queue = multiprocessing.Queue()
def process(self, x):
# Very simplified example of a method using a multiprocessing Queue
self._queue.put(x)
return self._queue.get()
# tests/test_my_class.py
import multiprocessing
import my_class
def test_spawn(monkeypatch):
ctx = multiprocessing.get_context('spawn')
monkeypatch.setattr(my_class.multiprocessing, "Queue", ctx.Queue)
obj = my_class.MyClass()
assert obj.process(6) == 6
def test_fork(monkeypatch):
ctx = multiprocessing.get_context('fork')
monkeypatch.setattr(my_class.multiprocessing, "Queue", ctx.Queue)
obj = my_class.MyClass()
assert obj.process(6) == 6