在发送 autoPing 之前,`callInThread` 似乎不会触发 WAMP 订阅

`callInThread` does not appear to trigger WAMP Subscribe until an autoPing is sent

当我使用 reactor.callInThread 触发一些代码时,反应堆似乎不会触发,直到发生其他一些预定事件(在本例中为自动 ping)。

根据自动 ping 与我的订阅请求对齐的时间,我可以看到 0 到 5 秒之间的响应时间。手动编辑 autobahn/twisted/wamp.py 以将 transport_factory.setProtocolOptions( ..., autoPingInterval=10., ...) 更改为亚秒级提供新的上限。

I 运行 tcpdump 和下面的代码(同时在同一个终端中 - 为清楚起见缩进 tcpdump 输出)并得到如下输出。请注意,tcpdump 表示所有消息都是同时发送的,并且服务器和客户端都不会延迟响应(这只是第一个客户端发送,距离初始触发/日志消息很远):

2017-10-30T16:26:44-0700 Auto ping/pong: sending ping auto-ping/pong
2017-10-30T16:26:44-0700 Expecting ping in 5.0 seconds for auto-ping/pong
2017-10-30T16:26:44-0700 Auto ping/pong: received pending pong for auto-ping/pong
2017-10-30T16:26:44-0700 WebSocketProtocol.onPong(payload=<4 bytes>)
    16:26:44.000880 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2772:2811, ack 8850, win 4096, options [nop,nop,TS val 826034469 ecr 12606734], length 39
    16:26:44.004235 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8850:8885, ack 2811, win 285, options [nop,nop,TS val 12613555 ecr 826034469], length 35
    16:26:44.004282 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8885, win 4094, options [nop,nop,TS val 826034472 ecr 12613555], length 0
2017-10-30T16:26:44-0700 WAMP SEND: message=Subscribe(XXX)
    <-------------------->
      Five Seconds Pass
    <-------------------->
2017-10-30T16:26:49-0700 WAMP RECV: message=Subscribed(XXX)
2017-10-30T16:26:49-0700 WAMP SEND: message=Call(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Result(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 
    16:26:49.000617 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2811:2884, ack 8885, win 4096, options [nop,nop,TS val 826039448 ecr 12613555], length 73
    16:26:49.004748 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8885:8939, ack 2884, win 285, options [nop,nop,TS val 12618555 ecr 826039448], length 54
    16:26:49.004797 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8939, win 4094, options [nop,nop,TS val 826039452 ecr 12618555], length 0
    16:26:49.006799 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2884:3537, ack 8939, win 4096, options [nop,nop,TS val 826039454 ecr 12618555], length 653
    16:26:49.009960 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8939:9000, ack 3537, win 299, options [nop,nop,TS val 12618561 ecr 826039454], length 61
    16:26:49.010004 IP CLIENT.58323 > SERVER.443: Flags [.], ack 9000, win 4094, options [nop,nop,TS val 826039457 ecr 12618561], length 0
    16:26:49.171613 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 9000:10329, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1329
    16:26:49.171616 IP SERVER.443 > CLIENT.58323: Flags [.], seq 10329:11777, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1448
    16:26:49.171618 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 11777:11857, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 80
    16:26:49.171663 IP CLIENT.58323 > SERVER.443: Flags [.], ack 10329, win 4054, options [nop,nop,TS val 826039617 ecr 12618723], length 0
    16:26:49.171678 IP CLIENT.58323 > SERVER.443: Flags [.], ack 11857, win 4006, options [nop,nop,TS val 826039617 ecr 12618723], length 0
2017-10-30T16:26:50-0700 Result: XXX (4.99s)
2017-10-30T16:26:50-0700
2017-10-30T16:26:50-0700 Sleeping

下面的示例重现了针对我的本地服务器的问题 - 从原始代码中大量删减,但仍然有一些值得注意的问题: * 使用 SSL(需要,很高兴被告知更好的技术) * 在一个线程中(~需要,在库中的派生进程中使用以进行交互) * 使用 config.extra 进行状态共享(很高兴得知更好的技术)

考虑要编辑但不相关的大写值。

import autobahn                                                                                                                                    
assert autobahn.__version__ == '17.6.2'                                                                                                            

class WampSession(ApplicationSession):                                                                                                             

    @inlineCallbacks                                                                                                                               
    def do_work(self, args):                                                                                                                       
        self.config.extra['joined'].wait()                                                                                                         
        yield self.subscribe(self._get_onResult(), args['resultTopic'])                                                                            
        args['authentication'] = self.config.extra['authentication']                                                                               
        yield self.call(FUNC, **args)                                                                                                              

    def _get_onResult(self):                                                                                                                       
        def onResult(**kw):
            if kw['status'] == 'done':
                self.config.extra['result_queue'].put(kw['requestID'])                                                                             
        return onResult

    def onConnect(self):
        self.join(REALM, authmethods=[u'ticket'])                                                                                                  

    def onJoin(self, details):                                                                                                                     
        self.config.extra['joined'].set()                                                                                                                                                                                                  

    def onChallenge(self, challenge):                                                                                                              
        return self.config.extra['authentication']                                                                                                 

    def onDisconnect(self):                                                                                                                        
        log.debug('onDisconnect')
        reactor.stop()


def get_session_runner():                                                                                                                          
    extra = {
        'result_queue': Queue.Queue(),                                                                                                             
        'joined': threading.Event(),                                                                                                               
        'authentication': ACCESS_TOKEN,                                                                                                            
    }     
    session = WampSession(ComponentConfig(extra=extra))                                                                                            
    certificate = PrivateCertificate.loadPEM(CERT_DATA)                                                                                            
    ssl = certificate.options()                                                                                                                    
    runner = ApplicationRunner(url=URL, ssl=ssl)                                                                                                   
    return session, runner                                                                                                                         


def main():
    session, runner = get_session_runner()
    # Monkeypatch this to make it run in a thread, since ApplicationRunner                                                                         
    # calls 'run' without arguments, and we need to skip signal handlers.                                                                          
    def _run_threaded():
        return reactor._run_original(installSignalHandlers=False)                                                                                  

    reactor._run_original = reactor.run
    reactor.run = _run_threaded
    t = threading.Thread(target=runner.run, args=(session,))                                                                                       
    t.daemon = True                                                                                                                                
    t.start()

    orig = FUNC_ARGS['resultTopic']
    for i in xrange(100):                                                                                                                          
        FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)                                                                                              
        start = time.time()                                                                                                                        
        reactor.callInThread(session.do_work, FUNC_ARGS)                                                                                           
        result = session.config.extra['result_queue'].get()                                                                                        
        end = time.time()
        print
        time.sleep(1)  # minisleep for tcpdump logging delay                                                                                       
        print 'Result: %s (%.2fs)' % (result, end - start)                                                                                         
        print '\nSleeping\n'
        time.sleep(6)
        print '\nDone Sleeping\n'

这似乎在根据 Jean-Paul 的回答进行修改后运行良好(将调用移至线程而不是反应堆,使用 callFromThread):

def main():
    session, runner = get_session_runner()
    t = threading.Thread(target=threadstuff, args=(session,))
    t.start()
    runner.run(session)


def threadstuff(session):
    orig = FUNC_ARGS['resultTopic']
    session.config.extra['joined'].wait()
    for i in xrange(100):
        FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
        start = time.time()
        reactor.callFromThread(session.do_work, FUNC_ARGS)
        result = session.config.extra['result_queue'].get()
        end = time.time()
        print
        time.sleep(1)  # minisleep for tcpdump logging delay
        print 'Result: %s (%.2fs)' % (result, end - start)
        print '\nSleeping\n'
        time.sleep(6)
        print '\nDone Sleeping\n'
    reactor.stop()

callInThread 是调度 API,它允许您从反应堆的线程池调用线程中的给定函数。它必须从反应器线程调用。

callFromThread 是调度 API,它允许您在反应器线程中调用给定函数。它通常从非反应器线程调用。

您应该在此处使用 callFromThread,因为您正在尝试从非 Reactor 线程安排工作。