Queue.asyncio ValueError: task_done() вызывается слишком много раз - ошибка кодирования или обнаружена ошибка?
Я реализовал фрагмент кода, который получает элемент из одной очереди и помещает один и тот же объект в каждую очередь из списка очередей. Проблема в том, что когда я запускаю определенный тест, я получаю ValueError: task_done() called too many times
исключение. Эта ошибка возникает в тестовом коде, а не в тестируемом коде.
Я использую asyncio.Queue
и программирование с использованием сопрограмм. Я соответствовал каждому Queue.get
с одним точно Queue.task_done
вызов. Я тестирую код с помощью pytest.
Я использую следующие библиотеки:
- Python 3.7
- pytest == 3.10.0
- pytest-asyncio == 0.9.0
У меня есть два файла: middleware.py
который содержит мою реализацию класса и test_middleware.py
который реализует тест pytest.
файл middlware.py
:
import asyncio
class DistributorMiddleware:
def __init__(self, in_queue, out_list_queue):
self._in = in_queue
self._out = out_list_queue
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
if ele == None:
break
for queue in self._out:
await queue.join()
файл test_middleware.py
:
import pytest
import asyncio
from asyncio import Queue
from middleware import DistributorMiddleware
import random
import os
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
q.task_done()
if e == None:
break
assert e in c_set
c_set.remove(e)
В тесте промежуточное ПО должно получать элементы из входной очереди и помещать их в 10 очередей из списка. И это делает работу правильно.
Тестовый код получает все элементы из каждой из 10 очередей и проверяет, присутствуют ли они в исходной очереди. Для 9 первых очередей все идет хорошо, без ошибок, но когда тест пытается получить первый элемент из десятого списка, ValueError
Поднялся:
request = <FixtureRequest for <Function 'test_distribution'>>, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
> q.task_done()
test_middlewares.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f7af5b9d828 maxsize=0 _queue=[b'\x15\xad\t\xaf', b'\x8b\xa2M=', None]>
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items have
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in
the queue.
"""
if self._unfinished_tasks <= 0:
> raise ValueError('task_done() called too many times')
E ValueError: task_done() called too many times
/usr/lib/python3.7/asyncio/queues.py:202: ValueError
каждый get
соответствует task_done
, Я могу подтвердить выполнение следующей модификации на test_middlware.py
файл:
- q.task_done()
+ try:
+ q.task_done()
+ except ValueError as err:
+ print(f'Value Error: {err}')
+ print(q.qsize())
Делая это, я могу видеть это даже со многими ValueError
будучи поднятым, элементы продолжают извлекаться из очереди. Тест пройден успешно:
platform linux -- Python 3.7.1, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /tmp/stack, inifile:
plugins: asyncio-0.9.0
collected 1 item
test_middlewares.py . [100%]
============================================================================================ 1 passed in 1.04 seconds =============================================================================================
Чтобы убедиться, что тест использует все элементы из всех списков, я вызвал ошибку, добавив ложное утверждение в конце теста:
assert e in c_set
c_set.remove(e)
+ assert False == True
+
Вывод результатов показывает, что все элементы извлекаются из всех списков, но каждый task_done в последней очереди генерирует ValueError
,
Queue 7: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 7: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 7: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 7: element: "None" number 4 extracted of 0!
Queue 8: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 8: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 8: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 8: element: "None" number 4 extracted of 0!
Queue 9: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
============================================================================================ 1 failed in 1.06 seconds ==
Вопрос в том, что я что-то упустил, и в моем коде есть ошибка, или я нашел ошибку?
1 ответ
У вас есть ошибка в вашем коде. В самом деле, queue.task_done()
должен вызываться только при извлечении элементов из очереди, а не при помещении их в очередь.
Но ваш промежуточный класс вызывает его в очереди, которую он только что использовал .put()
на последнюю очередь в self._out
список; удалить queue.task_done()
позвонить с DistributorMiddleware.distribute()
:
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
# ^^^^^ you didn't take anything from the queue here!
Когда вы удаляете эту строку, ваш тест проходит.
Причина, по которой вы видите исключение, возникшее в тесте, заключается в том, что только тогда очередь узнает task_done()
звонили слишком часто. queue.task_done()
вызывать DistributorMiddleware.distribute()
уменьшает счетчик незавершенных задач на 1, но только когда этот счетчик падает ниже нуля, аномалия может быть обнаружена. И вы доберетесь до того момента, когда последнее задание будет удалено из очереди в test_distribution()
, в этот момент счетчик незавершенных задач достиг 0 как минимум на один шаг раньше.
Возможно, это был призыв к self._in.task_done()
вместо? Вы только что получили элемент из этой очереди в этом while
цикл:
async def distribute(self):
while True:
ele = await self._in.get()
# getting an element from self._in
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
self._in.task_done()
# done with ele, so decrement the self._in unfinished tasks counter