Queue.asyncio ValueError: task_done() called too many times - Coding error or a bug detected?
Queue.asyncio ValueError: task_done() called too many times - Coding error or a bug detected?
我实现了一段代码,从一个队列中获取一个元素,并将相同的对象放入队列列表中的每个队列中。问题是,当我 运行 一个特定的测试时,我得到一个 ValueError: task_done() called too many times
异常。这个错误发生在测试代码中,而不是在被测试的代码中。
我正在使用 asyncio.Queue
并使用协同程序进行编程。
我将每个 Queue.get
与一个恰好 Queue.task_done
调用匹配。
我正在使用 pytest.
测试代码
我正在使用以下库:
- Python 3.7
- pytest==3.10.0
- pytest-asyncio==0.9.0
我有两个文件:包含我的 class 实现的 middleware.py
和实现 pytest 测试的 test_middleware.py
。
文件middlware.py
:
import asyncio
class DistributorMiddleware:
def __init__(self, in_queue, out_list_queue):
self._in = in_queue
self._out = out_list_queue
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
if ele == None:
break
for queue in self._out:
await queue.join()
文件test_middleware.py
:
import pytest
import asyncio
from asyncio import Queue
from middleware import DistributorMiddleware
import random
import os
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
q.task_done()
if e == None:
break
assert e in c_set
c_set.remove(e)
在测试中,中间件应该从输入队列中获取元素,并将它们放入列表中的 10 个队列中。它能正确地工作。
测试代码从 10 个队列中获取所有元素,并检查它们是否存在于原始队列中。对于前 9 个队列,一切顺利,没有错误,但是当测试试图从第十个列表中获取第一个元素时,会引发 ValueError
:
request = <FixtureRequest for <Function 'test_distribution'>>, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
> q.task_done()
test_middlewares.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f7af5b9d828 maxsize=0 _queue=[b'\x15\xad\t\xaf', b'\x8b\xa2M=', None]>
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items have
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in
the queue.
"""
if self._unfinished_tasks <= 0:
> raise ValueError('task_done() called too many times')
E ValueError: task_done() called too many times
/usr/lib/python3.7/asyncio/queues.py:202: ValueError
每个 get
匹配一个 task_done
。我可以验证对 test_middlware.py
文件进行以下修改:
- q.task_done()
+ try:
+ q.task_done()
+ except ValueError as err:
+ print(f'Value Error: {err}')
+ print(q.qsize())
这样做我能够看到,即使有许多 ValueError
被引发,元素仍然从队列中检索。测试成功:
platform linux -- Python 3.7.1, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /tmp/stack, inifile:
plugins: asyncio-0.9.0
collected 1 item
test_middlewares.py . [100%]
============================================================================================ 1 passed in 1.04 seconds =============================================================================================
为了确保测试消耗了所有列表中的所有元素,我强制错误在测试结束时添加了错误断言:
assert e in c_set
c_set.remove(e)
+ assert False == True
+
结果输出显示,从所有列表中检索了所有元素,但最后一个队列上的每个 task_done 生成了一个 ValueError
.
Queue 7: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 7: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 7: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 7: element: "None" number 4 extracted of 0!
Queue 8: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 8: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 8: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 8: element: "None" number 4 extracted of 0!
Queue 9: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
============================================================================================ 1 failed in 1.06 seconds ==
问题是,我是不是遗漏了什么,我的代码有错误,还是我发现了错误?
您的代码有错误。实际上,queue.task_done()
只应在从队列中取出元素 out 时调用,而不是在将元素放入队列时调用。
但是您的中间件 class 正在它刚刚使用 .put()
的队列上调用它,用于 self._out
列表中的最后一个队列;从 DistributorMiddleware.distribute()
:
移除 queue.task_done()
调用
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
# ^^^^^ you didn't take anything from the queue here!
当您删除该行时,您的测试通过。
您在 测试 中看到异常的原因是因为只有那时队列才知道 task_done()
被调用得太频繁了。 DistributorMiddleware.distribute()
中的 queue.task_done()
调用将未完成的任务计数器减 1,但只有当该计数器下降到 低于 零时才能检测到异常。只有当最后一个任务在 test_distribution()
中从队列中取出时,您才会到达那个点,此时未完成的任务计数器至少提前一步达到 0。
也许那是为了调用 self._in.task_done()
?您只是在 while
循环中从队列中取出了一个元素:
async def distribute(self):
while True:
ele = await self._in.get()
# getting an element from self._in
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
self._in.task_done()
# done with ele, so decrement the self._in unfinished tasks counter
我实现了一段代码,从一个队列中获取一个元素,并将相同的对象放入队列列表中的每个队列中。问题是,当我 运行 一个特定的测试时,我得到一个 ValueError: task_done() called too many times
异常。这个错误发生在测试代码中,而不是在被测试的代码中。
我正在使用 asyncio.Queue
并使用协同程序进行编程。
我将每个 Queue.get
与一个恰好 Queue.task_done
调用匹配。
我正在使用 pytest.
我正在使用以下库:
- Python 3.7
- pytest==3.10.0
- pytest-asyncio==0.9.0
我有两个文件:包含我的 class 实现的 middleware.py
和实现 pytest 测试的 test_middleware.py
。
文件middlware.py
:
import asyncio
class DistributorMiddleware:
def __init__(self, in_queue, out_list_queue):
self._in = in_queue
self._out = out_list_queue
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
if ele == None:
break
for queue in self._out:
await queue.join()
文件test_middleware.py
:
import pytest
import asyncio
from asyncio import Queue
from middleware import DistributorMiddleware
import random
import os
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
q.task_done()
if e == None:
break
assert e in c_set
c_set.remove(e)
在测试中,中间件应该从输入队列中获取元素,并将它们放入列表中的 10 个队列中。它能正确地工作。
测试代码从 10 个队列中获取所有元素,并检查它们是否存在于原始队列中。对于前 9 个队列,一切顺利,没有错误,但是当测试试图从第十个列表中获取第一个元素时,会引发 ValueError
:
request = <FixtureRequest for <Function 'test_distribution'>>, event_loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
@pytest.mark.asyncio
async def test_distribution(request, event_loop):
q_list = [ Queue() for _ in range(10) ]
_in = Queue()
distrib = DistributorMiddleware(_in, q_list)
event_loop.create_task(distrib.distribute())
num_ele = random.randint(1, 10)
ele_set = set()
for _ in range(num_ele):
ele = os.urandom(4)
ele_set.add(ele)
await _in.put(ele)
await _in.put(None)
await asyncio.sleep(1)
for i,q in enumerate(q_list):
assert q.qsize() == num_ele + 1
c_set = ele_set.copy()
count= 0
while True:
e = await q.get()
count+=1
print(f'Queue {i}: element: "{e}" number {count} extracted of {q.qsize()}!')
> q.task_done()
test_middlewares.py:34:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Queue at 0x7f7af5b9d828 maxsize=0 _queue=[b'\x15\xad\t\xaf', b'\x8b\xa2M=', None]>
def task_done(self):
"""Indicate that a formerly enqueued task is complete.
Used by queue consumers. For each get() used to fetch a task,
a subsequent call to task_done() tells the queue that the processing
on the task is complete.
If a join() is currently blocking, it will resume when all items have
been processed (meaning that a task_done() call was received for every
item that had been put() into the queue).
Raises ValueError if called more times than there were items placed in
the queue.
"""
if self._unfinished_tasks <= 0:
> raise ValueError('task_done() called too many times')
E ValueError: task_done() called too many times
/usr/lib/python3.7/asyncio/queues.py:202: ValueError
每个 get
匹配一个 task_done
。我可以验证对 test_middlware.py
文件进行以下修改:
- q.task_done()
+ try:
+ q.task_done()
+ except ValueError as err:
+ print(f'Value Error: {err}')
+ print(q.qsize())
这样做我能够看到,即使有许多 ValueError
被引发,元素仍然从队列中检索。测试成功:
platform linux -- Python 3.7.1, pytest-3.10.0, py-1.7.0, pluggy-0.8.0
rootdir: /tmp/stack, inifile:
plugins: asyncio-0.9.0
collected 1 item
test_middlewares.py . [100%]
============================================================================================ 1 passed in 1.04 seconds =============================================================================================
为了确保测试消耗了所有列表中的所有元素,我强制错误在测试结束时添加了错误断言:
assert e in c_set
c_set.remove(e)
+ assert False == True
+
结果输出显示,从所有列表中检索了所有元素,但最后一个队列上的每个 task_done 生成了一个 ValueError
.
Queue 7: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 7: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 7: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 7: element: "None" number 4 extracted of 0!
Queue 8: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
Queue 8: element: "b'\x15\xad\t\xaf'" number 2 extracted of 2!
Queue 8: element: "b'\x8b\xa2M='" number 3 extracted of 1!
Queue 8: element: "None" number 4 extracted of 0!
Queue 9: element: "b'\x9b\xf8m\x02'" number 1 extracted of 3!
============================================================================================ 1 failed in 1.06 seconds ==
问题是,我是不是遗漏了什么,我的代码有错误,还是我发现了错误?
您的代码有错误。实际上,queue.task_done()
只应在从队列中取出元素 out 时调用,而不是在将元素放入队列时调用。
但是您的中间件 class 正在它刚刚使用 .put()
的队列上调用它,用于 self._out
列表中的最后一个队列;从 DistributorMiddleware.distribute()
:
queue.task_done()
调用
async def distribute(self):
while True:
ele = await self._in.get()
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
queue.task_done()
# ^^^^^ you didn't take anything from the queue here!
当您删除该行时,您的测试通过。
您在 测试 中看到异常的原因是因为只有那时队列才知道 task_done()
被调用得太频繁了。 DistributorMiddleware.distribute()
中的 queue.task_done()
调用将未完成的任务计数器减 1,但只有当该计数器下降到 低于 零时才能检测到异常。只有当最后一个任务在 test_distribution()
中从队列中取出时,您才会到达那个点,此时未完成的任务计数器至少提前一步达到 0。
也许那是为了调用 self._in.task_done()
?您只是在 while
循环中从队列中取出了一个元素:
async def distribute(self):
while True:
ele = await self._in.get()
# getting an element from self._in
count=0
for queue in self._out:
await queue.put(ele)
count+=1
print(f'inserted ele in {count}')
self._in.task_done()
# done with ele, so decrement the self._in unfinished tasks counter