Есть ли лучшие способы запустить uvicorn в потоке?
Uvicorn не будет работать внутри потока, потому что сигналы не работают в потоках. Простое удаление обработки сигналов останавливает закрытие сервера (необходимо принудительно закрыть)
Мое решение мешало __new__
функция, чтобы получить объект сервера и создать функцию выключения, а затем привязать ее к сигналу вне потока.
Однако это действительно уродливое решение. Есть способы лучше?
def run():
'''
Start uvicorn server
returns exit function
'''
server = None
old_new = uvicorn.Server.__new__
def spoof_server(self, *_, **__):
'''Interfeer with __new__ to set server'''
nonlocal server
server = old_new(self)
return server
uvicorn.Server.__new__ = spoof_server
uvicorn.Server.install_signal_handlers = lambda *_, **__: None
Thread(target=uvicorn.run, args=[make_app()]).start()
def exit_server():
print('exiting...')
server.handle_exit(None, None)
return exit_server
1 ответ
Я тоже искал что-то подобное. Я нашел этот ответ, который помог мне. /questions/54715062/kak-ispolzovat-fastapi-i-uvicornrun-ne-blokiruya-potok/56527020#56527020
Выложу фрагмент сюда:
import contextlib
import time
import threading
import uvicorn
class Server(uvicorn.Server):
def install_signal_handlers(self):
pass
@contextlib.contextmanager
def run_in_thread(self):
thread = threading.Thread(target=self.run)
thread.start()
try:
while not self.started:
time.sleep(1e-3)
yield
finally:
self.should_exit = True
thread.join()
config = Config("example:app", host="127.0.0.1", port=5000, log_level="info")
server = Server(config=config)
with server.run_in_thread():
# Server is started.
...
# Server will be stopped once code put here is completed
...
# Server stopped.