asyncio.Queue.put 以外的错误?
Wrong except in asyncio.Queue.put?
asyncio\queues.py
@coroutine
def put(self, item):
"""Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
This method is a coroutine.
"""
while self.full():
putter = futures.Future(loop=self._loop)
self._putters.append(putter)
try:
yield from putter
except:
putter.cancel() # Just in case putter is not done yet.
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
在我看来,putter
可以通过cancel
、set_exception
或set_result
来完成。 get_nowait
使用 set_result
。只有 cancel
和 set_exception
会抛出异常,然后 except:
会发生。我认为不需要 except:
。
为什么要在Wake up the next in line
上加一个except:
?
更新:@Vincent
_wakeup_next
呼叫 set_result
。 set_result
将执行 self._state = _FINISHED
。 task1.cancel()
将 self._fut_waiter.cancel()
其中 return 错误。因此,任务 1 将未取消。
@Vincent 非常感谢
关键原因是task.cancel可以取消任务,虽然任务等待的未来已经set_result(self._state = _FINISHED).
如果等待 putter
的任务被取消,yield from putter
引发 CancelledError
。这可能在调用 get_nowait()
之后发生,并且您希望确保通知其他推杆队列中有新插槽可用。
这是一个例子:
async def main():
# Create a full queue
queue = asyncio.Queue(1)
await queue.put('A')
# Schedule two putters as tasks
task1 = asyncio.ensure_future(queue.put('B'))
task2 = asyncio.ensure_future(queue.put('C'))
await asyncio.sleep(0)
# Make room in the queue, print 'A'
print(queue.get_nowait())
# Cancel task 1 before giving the control back to the event loop
task1.cancel()
# Thankfully, the putter in task 2 has been notified
await task2
# Print 'C'
print(await queue.get())
编辑:关于内部发生的事情的更多信息:
queue.get_nowait()
:调用了putter.set_result(None)
;推杆状态现在是 FINISHED
,task1
将在控制权交还给事件循环时唤醒。
task1.cancel()
:task1._fut_waiter
已经完成,所以task1._must_cancel
设置为True
,以便下次CancelledError
[=18] =] 运行。
await task2
:
- 控制权交还给控制回路,
task1._step()
运行。一个CancelledError
被扔进协程里面:task1._coro.throw(CancelledError())
.
queue.put
捕获异常。由于队列未满且 'B'
不会被插入,因此必须通知队列中的下一个推杆:self._wakeup_next(self._putters)
。
- 然后
CancelledError
被再次加注并陷入task1._step()
。 task1
现在实际上取消了自身 (super().cancel()
)。
asyncio\queues.py
@coroutine
def put(self, item):
"""Put an item into the queue.
Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.
This method is a coroutine.
"""
while self.full():
putter = futures.Future(loop=self._loop)
self._putters.append(putter)
try:
yield from putter
except:
putter.cancel() # Just in case putter is not done yet.
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
return self.put_nowait(item)
在我看来,putter
可以通过cancel
、set_exception
或set_result
来完成。 get_nowait
使用 set_result
。只有 cancel
和 set_exception
会抛出异常,然后 except:
会发生。我认为不需要 except:
。
为什么要在Wake up the next in line
上加一个except:
?
更新:@Vincent
_wakeup_next
呼叫 set_result
。 set_result
将执行 self._state = _FINISHED
。 task1.cancel()
将 self._fut_waiter.cancel()
其中 return 错误。因此,任务 1 将未取消。
@Vincent 非常感谢
关键原因是task.cancel可以取消任务,虽然任务等待的未来已经set_result(self._state = _FINISHED).
如果等待 putter
的任务被取消,yield from putter
引发 CancelledError
。这可能在调用 get_nowait()
之后发生,并且您希望确保通知其他推杆队列中有新插槽可用。
这是一个例子:
async def main():
# Create a full queue
queue = asyncio.Queue(1)
await queue.put('A')
# Schedule two putters as tasks
task1 = asyncio.ensure_future(queue.put('B'))
task2 = asyncio.ensure_future(queue.put('C'))
await asyncio.sleep(0)
# Make room in the queue, print 'A'
print(queue.get_nowait())
# Cancel task 1 before giving the control back to the event loop
task1.cancel()
# Thankfully, the putter in task 2 has been notified
await task2
# Print 'C'
print(await queue.get())
编辑:关于内部发生的事情的更多信息:
queue.get_nowait()
:调用了putter.set_result(None)
;推杆状态现在是FINISHED
,task1
将在控制权交还给事件循环时唤醒。task1.cancel()
:task1._fut_waiter
已经完成,所以task1._must_cancel
设置为True
,以便下次CancelledError
[=18] =] 运行。await task2
:- 控制权交还给控制回路,
task1._step()
运行。一个CancelledError
被扔进协程里面:task1._coro.throw(CancelledError())
. queue.put
捕获异常。由于队列未满且'B'
不会被插入,因此必须通知队列中的下一个推杆:self._wakeup_next(self._putters)
。- 然后
CancelledError
被再次加注并陷入task1._step()
。task1
现在实际上取消了自身 (super().cancel()
)。
- 控制权交还给控制回路,