Python 的 asyncio.Event() 跨越不同的 类

Python's asyncio.Event() across different classes

我正在编写一个 Python 程序来与基于 CAN 总线的设备进行交互。为此,我成功地使用了 python-can 模块。我还使用 asyncio 来响应异步事件。我写了一个“CanBusManager”class,供“CanBusSequencer”class 使用。 “CanBusManager”class 负责处理 generating/sending/receiving 消息,而 CanBusSequencer 驱动要发送的消息序列。
在序列中的某个点,我想等到收到特定消息以“解锁”序列中要发送的剩余消息。代码概述:

main.py

async def main():
   
   event = asyncio.Event()
   sequencer = CanBusSequencer(event)
   task = asyncio.create_task(sequencer.doSequence())
   await task
 
asyncio.run(main(), debug=True)

canBusSequencer.py

from canBusManager import CanBusManager

class CanBusSequencer:
 
   def __init__(self, event)
 
      self.event = event
      self.canManager = CanBusManager(event)

   async def doSequence(self):
 
      for index, row in self.df_sequence.iterrows():
         if:...
            self.canManager.sendMsg(...)
         else:
            self.canManager.sendMsg(...)
            await self.event.wait()
            self.event.clear()

canBusManager.py

import can

class CanBusManager():
 
   def __init__(self, event):
 
      self.event = event
      self.startListening()
 
 **EDIT**
    def startListening(self):
    
       self.msgNotifier = can.Notifier(self.canBus, self.receivedMsgCallback)
 **EDIT**
 
   def receivedMsgCallback(self, msg):
 
      if(msg == ...):
         self.event.set()
   

现在我的程序停留在await self.event.wait(),即使接收到相关消息并执行了self.event.set()。 运行 debug = True 的程序揭示了一个

RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one

我不太明白。它与 asyncio 事件循环有关,以某种方式不正确 defined/managed。我来自 C++ 世界,目前正在使用 Python 编写我的第一个大型程序。任何指导将不胜感激:)

您的问题没有解释您如何安排 receivedMsgCallback 被调用。

如果它由在幕后使用线程的经典“异步”API 调用,那么它将从运行事件循环的线程外部调用。根据 the documentation,asyncio 原语 不是线程安全的 ,因此从另一个线程调用 event.set() 无法与 运行 事件循环正确同步,这就是为什么您的程序没有在应该唤醒的时候唤醒。

如果您想从事件循环线程外部执行任何与异步相关的操作,例如调用 Event.set,您需要使用 call_soon_threadsafe 或等效项。例如:

    def receivedMsgCallback(self, msg):
        if msg == ...:
            self.loop.call_soon_threadsafe(self.event.set)

事件循环对象应该可供 CanBusManager 对象使用,也许可以通过将其传递给其构造函数并将其分配给 self.loop.

附带说明一下,如果您创建任务只是为了立即等待它,那么您一开始就不需要任务。换句话说,您可以将 task = asyncio.create_task(sequencer.doSequence()); await task 替换为更简单的 await sequencer.doSequence().