Как я могу читать по одной строке из трио ReceiveStream?

Асинсио имеет StreamReader.readline(), позволяя что-то вроде:

while True:
    line = await reader.readline()
    ...

(Я не вижу async for доступно в Asyncio, но это будет очевидная эволюция)

Как мне получить эквивалент с трио?

Я не вижу никакой поддержки высокого уровня в трио 0,9. Все что я вижу это ReceiveStream.receive_some() который возвращает двоичные фрагменты произвольного размера; мне кажется нетривиальным декодировать и преобразовывать это во что-то построчное. Можно ли использовать стандартную библиотечную функцию или фрагмент? Я нашел модуль io stdlib, который выглядит многообещающе, но я не вижу способа обеспечить метод "подачи".

2 ответа

Вы правы, в данный момент в Trio нет поддержки высокого уровня. Должно быть что-то, хотя я не уверен на 100%, как это должно выглядеть. (Если вам интересно обсудить возможные решения, мы могли бы это сделать.)

Между тем, ваша реализация выглядит разумно.

Если вы хотите сделать его еще более надежным, вы можете (1) использовать bytearray вместо bytes для вашего буфера, чтобы добавить и удалить амортизированный O(n) вместо O(n^2), (2) установить ограничение на максимальную длину строки, чтобы злые коллеги не могли заставить вас тратить бесконечную буферизацию памяти бесконечно долго линии, (3) возобновить каждый вызов find в месте, где последний остановился вместо того, чтобы перезапускаться с начала каждый раз, снова, чтобы избежать поведения O (n ^ 2). Ничто из этого не является супер важным, если вы имеете дело только с разумными длинами линий и хорошо себя ведущими коллегами, но это также не повредит.

Вот измененная версия вашего кода, которая пытается включить эти три идеи:

class LineReader:
    def __init__(self, stream, max_line_length=16384):
        self.stream = stream
        self._line_generator = self.generate_lines(max_line_length)

    @staticmethod
    def generate_lines(max_line_length):
        buf = bytearray()
        find_start = 0
        while True:
            newline_idx = buf.find(b'\n', find_start)
            if newline_idx < 0:
                # no b'\n' found in buf
                if len(buf) > max_line_length:
                    raise ValueError("line too long")
                # next time, start the search where this one left off
                find_start = len(buf)
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                # Update the buffer in place, to take advantage of bytearray's
                # optimized delete-from-beginning feature.
                del buf[:newline_idx+1]
                # next time, start the search from the beginning
                find_start = 0
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

(Не стесняйтесь использовать по любой лицензии.)

Я закончил тем, что написал это. Не проверено должным образом (исправления приветствуются), но, похоже, работает:

class LineReader:
    def __init__(self, stream):
        self.stream = stream
        self._line_generator = self.generate_lines()

    @staticmethod
    def generate_lines():
        buf = bytes()
        while True:
            newline_idx = buf.find(b'\n')
            if newline_idx < 0:
                # no b'\n' found in buf
                more_data = yield
            else:
                # b'\n' found in buf so return the line and move up buf
                line = buf[:newline_idx+1]
                buf = buf[newline_idx+1:]
                more_data = yield line

            if more_data is not None:
                buf += bytes(more_data)

    async def readline(self):
        line = next(self._line_generator)
        while line is None:
            more_data = await self.stream.receive_some(1024)
            if not more_data:
                return b''  # this is the EOF indication expected by my caller
            line = self._line_generator.send(more_data)
        return line

Тогда я могу обернуть ReceiveStream с LineReader и использовать его readline метод. Добавление __aiter__() а также __anext()__ тогда будет тривиально, но мне это не нужно в моем случае (я портирую что-то на трио, которое не использует async for тем не мение).

Другим недостатком является то, что он предполагает UTF-8 или аналогичную кодировку, где b'\n' символы новой строки существуют в объекте закодированных байтов без изменений.

Было бы неплохо полагаться на библиотечные функции, чтобы справиться с этим, хотя; другие ответы приветствуются.

Очень наивный подход, который я использую:

async def readline(stdout: trio.abc.ReceiveStream):
    data = b""
    while True:
        _data = await stdout.receive_some()
        if _data == b"":
            break
        data += _data
        if data.endswith(b"\n"):
            break
    return data

# use it like this:
async def fn():
    async with await trio.open_process(..., stdout=subprocess.PIPE) as process:
        while True:
            # instead of:
            #   data = process.stdout.receive_some()
            # use this:
            line = await readline(process.stdout)
            if line == b"":
                break
Другие вопросы по тегам