Pytest зависает, когда на сервере флешки запускается другой поток

Я использую Python3, Flask 0.12 и Pytest 3.0.7.

У меня есть приложение фляги, похожее на это:

class AppInitializer:
    def __init__(self):
        pass

    @staticmethod
    def __function_to_be_refreshed():
        while True:
            try:
                time.sleep(5)
            except Exception as e:
                logger.exception(e)


    def create_app(self):
        daemon_thread = threading.Thread(target=self.__function_to_be_refreshed)
        daemon_thread.daemon = True
        daemon_thread.start()
        atexit.register(daemon_thread.join)
        app_ = Flask(__name__)
        return app_


app_initializer = AppInitializer()
app = app_initializer.create_app()

Я пытаюсь протестировать это приложение с помощью pytest следующим образом:

import unittest

import pytest


class TestCheckPriceRequestAPI(unittest.TestCase):
    def setUp(self):
        self.app = api.app.test_client()

    def test_random(self):
        pass

Когда я запускаю этот тест, используя pytestэтот тест (наряду со всеми другими тестами) выполняется успешно, но pytest зависает. Как остановить запущенный процесс pytest (или, возможно, уничтожить поток демона)?

1 ответ

Решение

join Команда означает только то, что многопоточность будет ждать окончания потока, но не завершит его. Чтобы завершить поток бесконечным циклом, вы можете сделать так:

class AppInitializer:
    def __init__(self):
        self._run = True
        self._daemon_thread = None

    def __function_to_be_refreshed(self):
        while self._run:
            try:
                time.sleep(5)
            except Exception as e:
                logger.exception(e)

    def __shutdown(self):
        self._run = False
        if self._daemon_thread:
            self._daemon_thread.join()

    def create_app(self):
        self._daemon_thread = threading.Thread(target=self.__function_to_be_refreshed)
        ...
        atexit.register(self.__shutdown)
        ...
Другие вопросы по тегам