Постоянный запуск gpiozero listener с uvicorn
Я пытаюсь написать приложение python, которое будет работать на raspberry pi, которое будет иметь как подключение к сокету (socketio с uvicorn), так и физические прослушиватели ввода. Я намерен одновременно прослушивать соединение сокета и события gpio, не блокируя друг друга. Вот что у меня есть на данный момент:
api.py
import uvicorn
import asyncio
from interaction.volume import VolumeControl
from system.platform_info import PlatformInfo
from connection.api_socket import app
class Api:
def __init__(self):
pass
def initialize_volume_listener(self):
volume_controller = VolumeControl()
volume_controller.start_listener()
def start(self):
PlatformInfo().print_info()
self.initialize_volume_listener()
uvicorn.run(app, host='127.0.0.1', port=5000, loop="asyncio")
volume_control.py
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
def volume_up(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(volume_up(None, None))
loop.run_until_complete(future)
loop.close()
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up
api_socket.py
import socketio
from system.platform_info import PlatformInfo
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
app = socketio.ASGIApp(sio)
@sio.on('connect')
async def test_connect(sid, environ):
system_info = PlatformInfo().get_info()
current_volume = 35
initial_data = {"system_info": system_info,
"settings": {"volume": current_volume}
}
await sio.emit('initial_data', initial_data, room=sid)
@sio.on('disconnect request')
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on('disconnect')
async def test_disconnect(sid):
print('Client disconnected')
await sio.emit('disconnect', {'data': 'Connected', 'count': 0}, room=sid)
@sio.on('volume_up')
async def volume_up(sid, volume=None):
increased_volume = 25
await sio.emit('volume_up', {'volume': increased_volume})
@sio.on('volume_down')
async def volume_down(sid, volume=None):
decreased_volume = 25
await sio.emit('volume_down', {'volume': decreased_volume})
Я пробовал использовать asyncio, но я новичок в асинхронных функциях python. Проблема в том, что я не мог непрерывно запускать прослушиватель кнопок, так что, пока выполняются функции сокета, я мог одновременно прослушивать взаимодействия кнопок, не блокируя друг друга. Слушатель кнопок вообще не работает. Вместо этого мне нужно, чтобы прослушиватель кнопок работал, пока работает приложение uvicorn.
Любая помощь будет оценена по достоинству. Спасибо.
1 ответ
@ Мигель, спасибо за ответ. Как вы предложили, я запустил gpio вwhile
петля и используется asyncio.run()
внутри цикла для вызова соответствующей функции socketio. Работает как задумано. Боковое примечание: я начал поток gpio с параметромdaemon=True
. Это позволяет выйти из цикла gpio, как только я выйду из основного потока, которым является сервер uvicorn. Окончательный код выглядит следующим образом:
api_socket.py
@sio.on('video_load')
async def load_video(sid, video_number=3):
data = open(os.path.join(os.getcwd(), f'sample_videos/dummy_video_{str(video_number)}.mp4'), 'rb').read()
print('Streaming video...')
await sio.emit('video_load', {'source': data}, room=sid)
nfc_listener.py
class NFCListener:
reading = True
def __init__(self):
GPIO.setmode(GPIO.BOARD)
self.rdr = RFID()
util = self.rdr.util()
util.debug = True
self.listener_thread = threading.Thread(target=self.start_nfc, daemon=True)
def start_nfc(self):
selected_video = None
while self.reading:
self.rdr.wait_for_tag()
(error, data) = self.rdr.request()
if not error:
print("\nCard identified!")
(error, uid) = self.rdr.anticoll()
if not error:
# Print UID
card_uid = str(uid[0])+" "+str(uid[1])+" " + \
str(uid[2])+" "+str(uid[3])+" "+str(uid[4])
print(card_uid)
if card_uid == "166 107 86 20 143":
if selected_video != 2:
selected_video = 2
asyncio.run(load_video(None, selected_video))
else:
if selected_video != 3:
selected_video = 3
asyncio.run(load_video(None, selected_video))
def start_reading(self):
self.listener_thread.start()
Gpiozero создает новый поток, который выполняет обратный вызов (это не очень хорошо документировано). Если обратный вызов должен выполняться в основном цикле asyncio, вам необходимо передать управление обратно в основной поток.
Метод call_soon_threadsafe сделает это за вас. По сути, он добавляет обратный вызов в список задач, которые вызывает основной цикл asyncio, когда происходит ожидание.
Однако циклы asyncio являются локальными для каждого потока: см. Get_running_loop
Поэтому, когда объект gpiozero создается в основном потоке asyncio, вам нужно сделать этот объект цикла доступным для объекта при вызове обратного вызова.
Вот как я это делаю для PIR, который вызывает метод asyncio MQTT:
class PIR:
def __init__(self, mqtt, pin):
self.pir = MotionSensor(pin=pin)
self.pir.when_motion = self.motion
# store the mqtt client we'll need to call
self.mqtt = mqtt
# This PIR object is created in the main thread
# so store that loop object
self.loop = asyncio.get_running_loop()
def motion(self):
# motion is called in the gpiozero monitoring thread
# it has to use our stored copy of the loop and then
# tell that loop to call the callback:
self.loop.call_soon_threadsafe(self.mqtt.publish,
f'sensor/gpiod/pir/kitchen', True)
Вы, наверное, захотите этого:
import asyncio
from gpiozero import Button
from connection.api_socket import volume_up
class VolumeControl:
def __init__(self):
self.volume_up_button = Button(4)
self.loop = asyncio.get_running_loop()
def volume_up_cb(self):
self.loop.call_soon_threadsafe(volume_up, None, None)
def start_listener(self):
self.volume_up_button.when_pressed = self.volume_up_cb
Намного чище и безопаснее!:)