Трио: чтение нескольких задач с одного и того же диска
У меня есть файловый дескриптор, и я хотел бы прочитать его с несколькими задачами. Каждый запрос read() на fd будет возвращать полный, независимый пакет данных (при условии, что данные доступны).
Моя наивная реализация заключалась в том, чтобы каждый работник выполнял следующий цикл:
async def work_loop(fd):
while True:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
К сожалению, это не работает, потому что трио повышает ResourceBusyError
если несколько задач блокируются на одном и том же fd. Поэтому моей следующей итерацией было написать пользовательскую функцию ожидания:
async def work_loop(fd):
while True:
await my_wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)
где
read_queue = trio.hazmat.ParkingLot()
async def my_wait_readable():
if name is None:
name = trio.hazmat.current_task().name
while True:
try:
log.debug('%s: Waiting for fd to become readable...', name)
await trio.hazmat.wait_readable(fd)
except trio.ResourceBusyError:
log.debug('%s: Resource busy, parking in read queue.', name)
await read_queue.park()
continue
log.debug('%s: fd readable, unparking next task.', name)
read_queue.unpark()
break
Однако в тестах я получаю такие сообщения:
2018-09-18 13:09:17.219 pyfuse3-worker-37: Waiting for fd to become readable...
2018-09-18 13:09:17.219 pyfuse3-worker-47: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-53: Waiting for fd to become readable...
2018-09-18 13:09:17.220 pyfuse3-worker-51: fd readable, unparking next task.
2018-09-18 13:09:17.220 pyfuse3-worker-51: doing work
2018-09-18 13:09:17.221 pyfuse3-worker-47: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-37: Resource busy, parking in read queue.
2018-09-18 13:09:17.221 pyfuse3-worker-53: Resource busy, parking in read queue.
Другими словами:
- Все задачи входят
trio.hazmat.wait_readable
- Одна задача успешно возвращается и пытается отменить следующую задачу (но ее нет)
- Другие задачи получают BusyError и парковаться сами
- Больше ничего не происходит, так как все рабочие припаркованы
Как правильно решить эту проблему?
1 ответ
Несколько читателей с одного и того же fd не имеют смысла, использование Trio (или нет) не меняет этот основной факт. Почему вы пытаетесь сделать это в первую очередь?
Если по какой-то причине вам действительно требуется параллельное выполнение нескольких задач для последующей обработки данных, используйте одну задачу чтения, чтобы добавить данные в очередь и позволить вашим задачам обработки получать свои данные из этого.
В качестве альтернативы вы можете использовать блокировку:
read_lock = trio.Lock()
async def work_loop(fd):
while True:
async with read_lock:
await trio.hazmat.wait_readable(fd)
buf = os.read(fd, BUFSIZE)
if not buf:
break
await do_work(buf)