Как я могу читать по одной строке из трио ReceiveStream?
Асинсио имеет StreamReader.readline()
, позволяя что-то вроде:
while True:
line = await reader.readline()
...
(Я не вижу async for
доступно в Asyncio, но это будет очевидная эволюция)
Как мне получить эквивалент с трио?
Я не вижу никакой поддержки высокого уровня в трио 0,9. Все что я вижу это ReceiveStream.receive_some()
который возвращает двоичные фрагменты произвольного размера; мне кажется нетривиальным декодировать и преобразовывать это во что-то построчное. Можно ли использовать стандартную библиотечную функцию или фрагмент? Я нашел модуль io stdlib, который выглядит многообещающе, но я не вижу способа обеспечить метод "подачи".
2 ответа
Вы правы, в данный момент в Trio нет поддержки высокого уровня. Должно быть что-то, хотя я не уверен на 100%, как это должно выглядеть. (Если вам интересно обсудить возможные решения, мы могли бы это сделать.)
Между тем, ваша реализация выглядит разумно.
Если вы хотите сделать его еще более надежным, вы можете (1) использовать bytearray
вместо bytes
для вашего буфера, чтобы добавить и удалить амортизированный O(n) вместо O(n^2), (2) установить ограничение на максимальную длину строки, чтобы злые коллеги не могли заставить вас тратить бесконечную буферизацию памяти бесконечно долго линии, (3) возобновить каждый вызов find
в месте, где последний остановился вместо того, чтобы перезапускаться с начала каждый раз, снова, чтобы избежать поведения O (n ^ 2). Ничто из этого не является супер важным, если вы имеете дело только с разумными длинами линий и хорошо себя ведущими коллегами, но это также не повредит.
Вот измененная версия вашего кода, которая пытается включить эти три идеи:
class LineReader:
def __init__(self, stream, max_line_length=16384):
self.stream = stream
self._line_generator = self.generate_lines(max_line_length)
@staticmethod
def generate_lines(max_line_length):
buf = bytearray()
find_start = 0
while True:
newline_idx = buf.find(b'\n', find_start)
if newline_idx < 0:
# no b'\n' found in buf
if len(buf) > max_line_length:
raise ValueError("line too long")
# next time, start the search where this one left off
find_start = len(buf)
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
# Update the buffer in place, to take advantage of bytearray's
# optimized delete-from-beginning feature.
del buf[:newline_idx+1]
# next time, start the search from the beginning
find_start = 0
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some(1024)
if not more_data:
return b'' # this is the EOF indication expected by my caller
line = self._line_generator.send(more_data)
return line
(Не стесняйтесь использовать по любой лицензии.)
Я закончил тем, что написал это. Не проверено должным образом (исправления приветствуются), но, похоже, работает:
class LineReader:
def __init__(self, stream):
self.stream = stream
self._line_generator = self.generate_lines()
@staticmethod
def generate_lines():
buf = bytes()
while True:
newline_idx = buf.find(b'\n')
if newline_idx < 0:
# no b'\n' found in buf
more_data = yield
else:
# b'\n' found in buf so return the line and move up buf
line = buf[:newline_idx+1]
buf = buf[newline_idx+1:]
more_data = yield line
if more_data is not None:
buf += bytes(more_data)
async def readline(self):
line = next(self._line_generator)
while line is None:
more_data = await self.stream.receive_some(1024)
if not more_data:
return b'' # this is the EOF indication expected by my caller
line = self._line_generator.send(more_data)
return line
Тогда я могу обернуть ReceiveStream
с LineReader
и использовать его readline
метод. Добавление __aiter__()
а также __anext()__
тогда будет тривиально, но мне это не нужно в моем случае (я портирую что-то на трио, которое не использует async for
тем не мение).
Другим недостатком является то, что он предполагает UTF-8 или аналогичную кодировку, где b'\n'
символы новой строки существуют в объекте закодированных байтов без изменений.
Было бы неплохо полагаться на библиотечные функции, чтобы справиться с этим, хотя; другие ответы приветствуются.
Очень наивный подход, который я использую:
async def readline(stdout: trio.abc.ReceiveStream):
data = b""
while True:
_data = await stdout.receive_some()
if _data == b"":
break
data += _data
if data.endswith(b"\n"):
break
return data
# use it like this:
async def fn():
async with await trio.open_process(..., stdout=subprocess.PIPE) as process:
while True:
# instead of:
# data = process.stdout.receive_some()
# use this:
line = await readline(process.stdout)
if line == b"":
break