Как правильно выходить из потока?

У меня есть Connection объект, который используется для хранения потоков чтения и записи asyncio соединения:

class Connection(object):

    def __init__(self, stream_in, stream_out):
        object.__init__(self)

        self.__in = stream_in
        self.__out = stream_out

    def read(self, n_bytes : int = -1):
        return self.__in.read(n_bytes)

    def write(self, bytes_ : bytes):
        self.__out.write(bytes_)
        yield from self.__out.drain()

На стороне сервера connected создает Connection объект каждый раз, когда клиент соединяется, затем читает 4 байта.

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    data = yield from conn.read(4)
    print(data)

А на стороне клиента выписано 4 байта.

@asyncio.coroutine
def client(loop):
    ...
    conn = Connection(stream_in, stream_out)
    yield from conn.write(b'test')

Это работает почти так, как ожидалось, но я должен yield from каждый read а также write вызов. я пробовал yield fromизнутри Connection:

def read(self, n_bytes : int = -1):
    data = yield from self.__in.read(n_bytes)
    return data

Но вместо того, чтобы получать данные, я получаю вывод, как

<generator object StreamReader.read at 0x1109983b8>

Если я позвоню read а также write из нескольких мест, я бы предпочел не повторять yield fromкаждый раз; а держать их внутри Connection, Моя конечная цель - сократить мои new_conection функция к этому:

@asyncio.coroutine
def new_conection(stream_in, stream_out):
    conn = Connection(stream_in, stream_out)
    print(conn.read(4))

2 ответа

Решение

Так как StreamReader.read это сопрограмма, ваши единственные варианты для его вызова: а) завернуть его в Task или же Future и запустить это через цикл событий, б) await в сопрограмме, определенной с async def или в) используя yield from с ним из сопрограммы определяется как функция, украшенная @asyncio.coroutine,

поскольку Connection.read вызывается из цикла событий (через сопрограмму new_connection), вы не можете повторно использовать этот цикл событий для запуска Task или же Future за StreamReader.read: петли событий не могут быть запущены, пока они уже запущены. Вы должны были бы либо остановить цикл обработки событий (что губительно и, вероятно, это невозможно сделать правильно), либо создать новый цикл обработки событий (грязно и не использовать цель использования сопрограмм). Ни один из них не желателен, так Connection.read должен быть сопрограммой или async функция.

Два других варианта (await в async def сопрограмма или yield from в @asyncio.coroutine функция) в основном эквивалентны. Единственная разница в том, что async def а также await были добавлены в Python 3.5, поэтому для 3.4, используя yield from а также @asyncio.coroutine это единственный вариант (сопрограммы и asyncio до 3.4 не существовало, поэтому другие версии не имеют значения). Лично я предпочитаю использовать async def а также await, потому что определение сопрограмм с async def чище и понятнее, чем с декоратором.

Вкратце: есть Connection.read а также new_connection быть сопрограммы (используя либо декоратор или async ключевое слово) и использовать await (или же yield from) при вызове других сопрограмм (await conn.read(4) в new_connection, а также await self.__in.read(n_bytes) в Connection.read).

Я обнаружил, что фрагмент исходного кода StreamReader в строке 620 на самом деле является прекрасным примером использования функции.

В моем предыдущем ответе я упустил из виду тот факт, что self.__in.read(n_bytes) это не только сопрограмма (что я должен был знать, учитывая, что это было из asyncio модуль XD), но он дает результат на линии. Так что это на самом деле генератор, и вам нужно будет от него отказаться.

Заимствуя этот цикл из исходного кода, ваша функция чтения должна выглядеть примерно так:

def read(self, n_bytes : int = -1):
    data = bytearray() #or whatever object you are looking for
    while 1:
        block = yield from self.__in.read(n_bytes)
        if not block:
            break
        data += block
    return data

Так как self.__in.read(n_bytes) является генератором, вы должны продолжать приносить из него, пока он не даст пустой результат, чтобы сигнализировать об окончании чтения. Теперь ваша функция чтения должна возвращать данные, а не генератор. Вам не придется уступать этой версии conn.read(),

Другие вопросы по тегам