异步、非阻塞地处理来自高速公路订阅的消息

Process messages from autobahn Subscriptions asynchronously, non-blocking

我在 docker 容器中有一个 python "Device" 运行。它连接到 Crossbar 路由器,在订阅的频道上接收 autobahn/WAMP 事件消息。

发布某个事件时,我的设备正在调用一个将在几秒钟内完成的方法。 现在,我希望它跳过或处理接收到的同一事件的任何消息,而该方法仍然是 运行。我试图通过使用 Twisted 的 @inlinecallback 装饰器并在设备上设置 "self.busy"-flag 来实现这一点。

但它不会立即延迟返回,而是表现得像一种正常的阻塞方法,因此传入的消息会一个接一个地处理。

这是我的代码:

from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks

class Pixel(ApplicationSession):

@inlineCallbacks
def onJoin(self, details):
    yield self.subscribe(self.handler_no_access, 'com.event.no_access')

@inlineCallbacks
def handler_no_access(self, direction):
    entries = len(self.handlers['no_access'][direction])

    if entries == 0:
        self.handlers['no_access'][direction].append(direction)
        result = yield self._handler_no_access()
        return result

    else:
        yield print('handler_no_access: entries not 0: ', self.handlers['no_access'])

@inlineCallbacks
def _handler_no_access(self):
    for direction in self.handlers['no_access']:

        for message in self.handlers['no_access'][direction]:
            yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5)
            self.handlers['no_access'][direction] = []

顺便说一句,我已经用 self.handler 字典走了一条老路。

编辑

拦截方式为:

yield self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5)

在树莓派的GPIO上控制一个Neopixel,让它闪烁1s。对该方法的任何进一步调用

def handler_no_access(self, direction)

当 _timed_switch 还没有完成时,将被跳过,所以它们不会叠加。

解决方案

@inlineCallbacks
def handler_no_access(self, direction):
    direction = str(direction)

    if self.busy[direction] is False:

        self.busy[direction] = True

        # non-blocking now
        yield deferToThread(self._handler_no_access, direction)

    else:
        yield print('handler_no_access: direction {} busy '.format(direction))

def _handler_no_access(self, direction):

    # this takes 1s to execute
    self._timed_switch(self.direction_leds[direction], 'red', 0.2, 5)

    self.busy[direction] = False

inlineCallbacks 不会将阻塞代码变成非阻塞代码。它只是使用 Deferreds 的替代 API。延迟只是管理回调的一种方式。

您需要重写您的阻塞代码以通过其他方式实现非阻塞。您实际上并没有说明您的代码的哪一部分正在阻塞,也没有说明它阻塞了什么,因此很难建议您如何执行此操作。将阻塞代码变为非阻塞的仅有的两个通用工具是线程和进程。因此,您可以 运行 在单独的线程或进程中使用该函数。该函数可能会或可能不会在这样的执行上下文中工作(同样,如果不知道它到底做了什么,就无法知道)。