asyncio.Queue.put 以外的错误?

Wrong except in asyncio.Queue.put?

asyncio\queues.py

@coroutine
def put(self, item):
    """Put an item into the queue.

    Put an item into the queue. If the queue is full, wait until a free
    slot is available before adding item.

    This method is a coroutine.
    """
    while self.full():
        putter = futures.Future(loop=self._loop)
        self._putters.append(putter)
        try:
            yield from putter
        except:
            putter.cancel()  # Just in case putter is not done yet.
            if not self.full() and not putter.cancelled():
                # We were woken up by get_nowait(), but can't take
                # the call.  Wake up the next in line.
                self._wakeup_next(self._putters)
            raise
    return self.put_nowait(item)

在我看来,putter可以通过cancelset_exceptionset_result来完成。 get_nowait 使用 set_result。只有 cancelset_exception 会抛出异常,然后 except: 会发生。我认为不需要 except:

为什么要在Wake up the next in line上加一个except:

更新:@Vincent _wakeup_next 呼叫 set_resultset_result 将执行 self._state = _FINISHEDtask1.cancel()self._fut_waiter.cancel() 其中 return 错误。因此,任务 1 将未取消

@Vincent 非常感谢

关键原因是task.cancel可以取消任务,虽然任务等待的未来已经set_result(self._state = _FINISHED).

如果等待 putter 的任务被取消,yield from putter 引发 CancelledError。这可能在调用 get_nowait() 之后发生,并且您希望确保通知其他推杆队列中有新插槽可用。

这是一个例子:

async def main():
    # Create a full queue
    queue = asyncio.Queue(1)
    await queue.put('A')
    # Schedule two putters as tasks
    task1 = asyncio.ensure_future(queue.put('B'))
    task2 = asyncio.ensure_future(queue.put('C'))
    await asyncio.sleep(0)
    # Make room in the queue, print 'A'
    print(queue.get_nowait())
    # Cancel task 1 before giving the control back to the event loop
    task1.cancel()
    # Thankfully, the putter in task 2 has been notified
    await task2
    # Print 'C'
    print(await queue.get())

编辑:关于内部发生的事情的更多信息:

  • queue.get_nowait():调用了putter.set_result(None);推杆状态现在是 FINISHEDtask1 将在控制权交还给事件循环时唤醒。
  • task1.cancel()task1._fut_waiter已经完成,所以task1._must_cancel设置为True,以便下次CancelledError[=18] =] 运行。
  • await task2:
    • 控制权交还给控制回路,task1._step()运行。一个CancelledError被扔进协程里面:task1._coro.throw(CancelledError()).
    • queue.put 捕获异常。由于队列未满且 'B' 不会被插入,因此必须通知队列中的下一个推杆:self._wakeup_next(self._putters)
    • 然后CancelledError被再次加注并陷入task1._step()task1 现在实际上取消了自身 (super().cancel())。