Постоянный запуск gpiozero listener с uvicorn

Я пытаюсь написать приложение python, которое будет работать на raspberry pi, которое будет иметь как подключение к сокету (socketio с uvicorn), так и физические прослушиватели ввода. Я намерен одновременно прослушивать соединение сокета и события gpio, не блокируя друг друга. Вот что у меня есть на данный момент:

api.py

import uvicorn
import asyncio
from interaction.volume import VolumeControl
from system.platform_info import PlatformInfo
from connection.api_socket import app


class Api:
    def __init__(self):
        pass

    def initialize_volume_listener(self):
        volume_controller = VolumeControl()
        volume_controller.start_listener()

    def start(self):
        PlatformInfo().print_info()
        self.initialize_volume_listener()
        uvicorn.run(app, host='127.0.0.1', port=5000, loop="asyncio")

volume_control.py

import asyncio
from gpiozero import Button
from connection.api_socket import volume_up


class VolumeControl:
    def __init__(self):
        self.volume_up_button = Button(4)

    def volume_up(self):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        future = asyncio.ensure_future(volume_up(None, None))
        loop.run_until_complete(future)
        loop.close()

    def start_listener(self):
        self.volume_up_button.when_pressed = self.volume_up

api_socket.py

import socketio
from system.platform_info import PlatformInfo

sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*')
app = socketio.ASGIApp(sio)


@sio.on('connect')
async def test_connect(sid, environ):
    system_info = PlatformInfo().get_info()
    current_volume = 35
    initial_data = {"system_info": system_info,
                    "settings": {"volume": current_volume}
                    }
    await sio.emit('initial_data', initial_data, room=sid)


@sio.on('disconnect request')
async def disconnect_request(sid):
    await sio.disconnect(sid)


@sio.on('disconnect')
async def test_disconnect(sid):
    print('Client disconnected')
    await sio.emit('disconnect', {'data': 'Connected', 'count': 0}, room=sid)


@sio.on('volume_up')
async def volume_up(sid, volume=None):
    increased_volume = 25
    await sio.emit('volume_up', {'volume': increased_volume})


@sio.on('volume_down')
async def volume_down(sid, volume=None):
    decreased_volume = 25
    await sio.emit('volume_down', {'volume': decreased_volume})

Я пробовал использовать asyncio, но я новичок в асинхронных функциях python. Проблема в том, что я не мог непрерывно запускать прослушиватель кнопок, так что, пока выполняются функции сокета, я мог одновременно прослушивать взаимодействия кнопок, не блокируя друг друга. Слушатель кнопок вообще не работает. Вместо этого мне нужно, чтобы прослушиватель кнопок работал, пока работает приложение uvicorn.

Любая помощь будет оценена по достоинству. Спасибо.

1 ответ

Решение

@ Мигель, спасибо за ответ. Как вы предложили, я запустил gpio вwhile петля и используется asyncio.run()внутри цикла для вызова соответствующей функции socketio. Работает как задумано. Боковое примечание: я начал поток gpio с параметромdaemon=True. Это позволяет выйти из цикла gpio, как только я выйду из основного потока, которым является сервер uvicorn. Окончательный код выглядит следующим образом:

api_socket.py

@sio.on('video_load')
async def load_video(sid, video_number=3):
    data = open(os.path.join(os.getcwd(), f'sample_videos/dummy_video_{str(video_number)}.mp4'), 'rb').read()
    print('Streaming video...')
    await sio.emit('video_load', {'source': data}, room=sid)

nfc_listener.py

class NFCListener:

    reading = True

    def __init__(self):
        GPIO.setmode(GPIO.BOARD)
        self.rdr = RFID()
        util = self.rdr.util()
        util.debug = True
        self.listener_thread = threading.Thread(target=self.start_nfc, daemon=True)

    def start_nfc(self):
        selected_video = None
        while self.reading:
            self.rdr.wait_for_tag()
            (error, data) = self.rdr.request()
            if not error:
                print("\nCard identified!")
                (error, uid) = self.rdr.anticoll()
                if not error:
                    # Print UID
                    card_uid = str(uid[0])+" "+str(uid[1])+" " + \
                        str(uid[2])+" "+str(uid[3])+" "+str(uid[4])
                    print(card_uid)
                    if card_uid == "166 107 86 20 143":
                        if selected_video != 2:
                            selected_video = 2
                            asyncio.run(load_video(None, selected_video))
                    else:
                        if selected_video != 3:
                            selected_video = 3
                            asyncio.run(load_video(None, selected_video))

    def start_reading(self):
        self.listener_thread.start()

Gpiozero создает новый поток, который выполняет обратный вызов (это не очень хорошо документировано). Если обратный вызов должен выполняться в основном цикле asyncio, вам необходимо передать управление обратно в основной поток.

Метод call_soon_threadsafe сделает это за вас. По сути, он добавляет обратный вызов в список задач, которые вызывает основной цикл asyncio, когда происходит ожидание.

Однако циклы asyncio являются локальными для каждого потока: см. Get_running_loop

Поэтому, когда объект gpiozero создается в основном потоке asyncio, вам нужно сделать этот объект цикла доступным для объекта при вызове обратного вызова.

Вот как я это делаю для PIR, который вызывает метод asyncio MQTT:

class PIR:
    def __init__(self, mqtt, pin):
        self.pir = MotionSensor(pin=pin)
        self.pir.when_motion = self.motion
        # store the mqtt client we'll need to call
        self.mqtt = mqtt
        # This PIR object is created in the main thread
        # so store that loop object
        self.loop = asyncio.get_running_loop()

    def motion(self):
        # motion is called in the gpiozero monitoring thread
        # it has to use our stored copy of the loop and then
        # tell that loop to call the callback:
        self.loop.call_soon_threadsafe(self.mqtt.publish,
                                       f'sensor/gpiod/pir/kitchen', True)

Вы, наверное, захотите этого:

import asyncio
from gpiozero import Button
from connection.api_socket import volume_up

class VolumeControl:
    def __init__(self):
        self.volume_up_button = Button(4)
        self.loop = asyncio.get_running_loop()

    def volume_up_cb(self):
        self.loop.call_soon_threadsafe(volume_up, None, None)

    def start_listener(self):
        self.volume_up_button.when_pressed = self.volume_up_cb

Намного чище и безопаснее!:)

Другие вопросы по тегам