在发送 autoPing 之前,`callInThread` 似乎不会触发 WAMP 订阅
`callInThread` does not appear to trigger WAMP Subscribe until an autoPing is sent
当我使用 reactor.callInThread
触发一些代码时,反应堆似乎不会触发,直到发生其他一些预定事件(在本例中为自动 ping)。
根据自动 ping 与我的订阅请求对齐的时间,我可以看到 0 到 5 秒之间的响应时间。手动编辑 autobahn/twisted/wamp.py
以将 transport_factory.setProtocolOptions( ..., autoPingInterval=10., ...)
更改为亚秒级提供新的上限。
I 运行 tcpdump 和下面的代码(同时在同一个终端中 - 为清楚起见缩进 tcpdump 输出)并得到如下输出。请注意,tcpdump 表示所有消息都是同时发送的,并且服务器和客户端都不会延迟响应(这只是第一个客户端发送,距离初始触发/日志消息很远):
2017-10-30T16:26:44-0700 Auto ping/pong: sending ping auto-ping/pong
2017-10-30T16:26:44-0700 Expecting ping in 5.0 seconds for auto-ping/pong
2017-10-30T16:26:44-0700 Auto ping/pong: received pending pong for auto-ping/pong
2017-10-30T16:26:44-0700 WebSocketProtocol.onPong(payload=<4 bytes>)
16:26:44.000880 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2772:2811, ack 8850, win 4096, options [nop,nop,TS val 826034469 ecr 12606734], length 39
16:26:44.004235 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8850:8885, ack 2811, win 285, options [nop,nop,TS val 12613555 ecr 826034469], length 35
16:26:44.004282 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8885, win 4094, options [nop,nop,TS val 826034472 ecr 12613555], length 0
2017-10-30T16:26:44-0700 WAMP SEND: message=Subscribe(XXX)
<-------------------->
Five Seconds Pass
<-------------------->
2017-10-30T16:26:49-0700 WAMP RECV: message=Subscribed(XXX)
2017-10-30T16:26:49-0700 WAMP SEND: message=Call(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Result(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700
16:26:49.000617 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2811:2884, ack 8885, win 4096, options [nop,nop,TS val 826039448 ecr 12613555], length 73
16:26:49.004748 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8885:8939, ack 2884, win 285, options [nop,nop,TS val 12618555 ecr 826039448], length 54
16:26:49.004797 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8939, win 4094, options [nop,nop,TS val 826039452 ecr 12618555], length 0
16:26:49.006799 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2884:3537, ack 8939, win 4096, options [nop,nop,TS val 826039454 ecr 12618555], length 653
16:26:49.009960 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8939:9000, ack 3537, win 299, options [nop,nop,TS val 12618561 ecr 826039454], length 61
16:26:49.010004 IP CLIENT.58323 > SERVER.443: Flags [.], ack 9000, win 4094, options [nop,nop,TS val 826039457 ecr 12618561], length 0
16:26:49.171613 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 9000:10329, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1329
16:26:49.171616 IP SERVER.443 > CLIENT.58323: Flags [.], seq 10329:11777, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1448
16:26:49.171618 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 11777:11857, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 80
16:26:49.171663 IP CLIENT.58323 > SERVER.443: Flags [.], ack 10329, win 4054, options [nop,nop,TS val 826039617 ecr 12618723], length 0
16:26:49.171678 IP CLIENT.58323 > SERVER.443: Flags [.], ack 11857, win 4006, options [nop,nop,TS val 826039617 ecr 12618723], length 0
2017-10-30T16:26:50-0700 Result: XXX (4.99s)
2017-10-30T16:26:50-0700
2017-10-30T16:26:50-0700 Sleeping
下面的示例重现了针对我的本地服务器的问题 - 从原始代码中大量删减,但仍然有一些值得注意的问题:
* 使用 SSL(需要,很高兴被告知更好的技术)
* 在一个线程中(~需要,在库中的派生进程中使用以进行交互)
* 使用 config.extra 进行状态共享(很高兴得知更好的技术)
考虑要编辑但不相关的大写值。
import autobahn
assert autobahn.__version__ == '17.6.2'
class WampSession(ApplicationSession):
@inlineCallbacks
def do_work(self, args):
self.config.extra['joined'].wait()
yield self.subscribe(self._get_onResult(), args['resultTopic'])
args['authentication'] = self.config.extra['authentication']
yield self.call(FUNC, **args)
def _get_onResult(self):
def onResult(**kw):
if kw['status'] == 'done':
self.config.extra['result_queue'].put(kw['requestID'])
return onResult
def onConnect(self):
self.join(REALM, authmethods=[u'ticket'])
def onJoin(self, details):
self.config.extra['joined'].set()
def onChallenge(self, challenge):
return self.config.extra['authentication']
def onDisconnect(self):
log.debug('onDisconnect')
reactor.stop()
def get_session_runner():
extra = {
'result_queue': Queue.Queue(),
'joined': threading.Event(),
'authentication': ACCESS_TOKEN,
}
session = WampSession(ComponentConfig(extra=extra))
certificate = PrivateCertificate.loadPEM(CERT_DATA)
ssl = certificate.options()
runner = ApplicationRunner(url=URL, ssl=ssl)
return session, runner
def main():
session, runner = get_session_runner()
# Monkeypatch this to make it run in a thread, since ApplicationRunner
# calls 'run' without arguments, and we need to skip signal handlers.
def _run_threaded():
return reactor._run_original(installSignalHandlers=False)
reactor._run_original = reactor.run
reactor.run = _run_threaded
t = threading.Thread(target=runner.run, args=(session,))
t.daemon = True
t.start()
orig = FUNC_ARGS['resultTopic']
for i in xrange(100):
FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
start = time.time()
reactor.callInThread(session.do_work, FUNC_ARGS)
result = session.config.extra['result_queue'].get()
end = time.time()
print
time.sleep(1) # minisleep for tcpdump logging delay
print 'Result: %s (%.2fs)' % (result, end - start)
print '\nSleeping\n'
time.sleep(6)
print '\nDone Sleeping\n'
这似乎在根据 Jean-Paul 的回答进行修改后运行良好(将调用移至线程而不是反应堆,使用 callFromThread
):
def main():
session, runner = get_session_runner()
t = threading.Thread(target=threadstuff, args=(session,))
t.start()
runner.run(session)
def threadstuff(session):
orig = FUNC_ARGS['resultTopic']
session.config.extra['joined'].wait()
for i in xrange(100):
FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
start = time.time()
reactor.callFromThread(session.do_work, FUNC_ARGS)
result = session.config.extra['result_queue'].get()
end = time.time()
print
time.sleep(1) # minisleep for tcpdump logging delay
print 'Result: %s (%.2fs)' % (result, end - start)
print '\nSleeping\n'
time.sleep(6)
print '\nDone Sleeping\n'
reactor.stop()
callInThread
是调度 API,它允许您从反应堆的线程池调用线程中的给定函数。它必须从反应器线程调用。
callFromThread
是调度 API,它允许您在反应器线程中调用给定函数。它通常从非反应器线程调用。
您应该在此处使用 callFromThread
,因为您正在尝试从非 Reactor 线程安排工作。
当我使用 reactor.callInThread
触发一些代码时,反应堆似乎不会触发,直到发生其他一些预定事件(在本例中为自动 ping)。
根据自动 ping 与我的订阅请求对齐的时间,我可以看到 0 到 5 秒之间的响应时间。手动编辑 autobahn/twisted/wamp.py
以将 transport_factory.setProtocolOptions( ..., autoPingInterval=10., ...)
更改为亚秒级提供新的上限。
I 运行 tcpdump 和下面的代码(同时在同一个终端中 - 为清楚起见缩进 tcpdump 输出)并得到如下输出。请注意,tcpdump 表示所有消息都是同时发送的,并且服务器和客户端都不会延迟响应(这只是第一个客户端发送,距离初始触发/日志消息很远):
2017-10-30T16:26:44-0700 Auto ping/pong: sending ping auto-ping/pong
2017-10-30T16:26:44-0700 Expecting ping in 5.0 seconds for auto-ping/pong
2017-10-30T16:26:44-0700 Auto ping/pong: received pending pong for auto-ping/pong
2017-10-30T16:26:44-0700 WebSocketProtocol.onPong(payload=<4 bytes>)
16:26:44.000880 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2772:2811, ack 8850, win 4096, options [nop,nop,TS val 826034469 ecr 12606734], length 39
16:26:44.004235 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8850:8885, ack 2811, win 285, options [nop,nop,TS val 12613555 ecr 826034469], length 35
16:26:44.004282 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8885, win 4094, options [nop,nop,TS val 826034472 ecr 12613555], length 0
2017-10-30T16:26:44-0700 WAMP SEND: message=Subscribe(XXX)
<-------------------->
Five Seconds Pass
<-------------------->
2017-10-30T16:26:49-0700 WAMP RECV: message=Subscribed(XXX)
2017-10-30T16:26:49-0700 WAMP SEND: message=Call(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Result(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700 WAMP RECV: message=Event(XXX)
2017-10-30T16:26:49-0700
16:26:49.000617 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2811:2884, ack 8885, win 4096, options [nop,nop,TS val 826039448 ecr 12613555], length 73
16:26:49.004748 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8885:8939, ack 2884, win 285, options [nop,nop,TS val 12618555 ecr 826039448], length 54
16:26:49.004797 IP CLIENT.58323 > SERVER.443: Flags [.], ack 8939, win 4094, options [nop,nop,TS val 826039452 ecr 12618555], length 0
16:26:49.006799 IP CLIENT.58323 > SERVER.443: Flags [P.], seq 2884:3537, ack 8939, win 4096, options [nop,nop,TS val 826039454 ecr 12618555], length 653
16:26:49.009960 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 8939:9000, ack 3537, win 299, options [nop,nop,TS val 12618561 ecr 826039454], length 61
16:26:49.010004 IP CLIENT.58323 > SERVER.443: Flags [.], ack 9000, win 4094, options [nop,nop,TS val 826039457 ecr 12618561], length 0
16:26:49.171613 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 9000:10329, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1329
16:26:49.171616 IP SERVER.443 > CLIENT.58323: Flags [.], seq 10329:11777, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 1448
16:26:49.171618 IP SERVER.443 > CLIENT.58323: Flags [P.], seq 11777:11857, ack 3537, win 299, options [nop,nop,TS val 12618723 ecr 826039457], length 80
16:26:49.171663 IP CLIENT.58323 > SERVER.443: Flags [.], ack 10329, win 4054, options [nop,nop,TS val 826039617 ecr 12618723], length 0
16:26:49.171678 IP CLIENT.58323 > SERVER.443: Flags [.], ack 11857, win 4006, options [nop,nop,TS val 826039617 ecr 12618723], length 0
2017-10-30T16:26:50-0700 Result: XXX (4.99s)
2017-10-30T16:26:50-0700
2017-10-30T16:26:50-0700 Sleeping
下面的示例重现了针对我的本地服务器的问题 - 从原始代码中大量删减,但仍然有一些值得注意的问题: * 使用 SSL(需要,很高兴被告知更好的技术) * 在一个线程中(~需要,在库中的派生进程中使用以进行交互) * 使用 config.extra 进行状态共享(很高兴得知更好的技术)
考虑要编辑但不相关的大写值。
import autobahn
assert autobahn.__version__ == '17.6.2'
class WampSession(ApplicationSession):
@inlineCallbacks
def do_work(self, args):
self.config.extra['joined'].wait()
yield self.subscribe(self._get_onResult(), args['resultTopic'])
args['authentication'] = self.config.extra['authentication']
yield self.call(FUNC, **args)
def _get_onResult(self):
def onResult(**kw):
if kw['status'] == 'done':
self.config.extra['result_queue'].put(kw['requestID'])
return onResult
def onConnect(self):
self.join(REALM, authmethods=[u'ticket'])
def onJoin(self, details):
self.config.extra['joined'].set()
def onChallenge(self, challenge):
return self.config.extra['authentication']
def onDisconnect(self):
log.debug('onDisconnect')
reactor.stop()
def get_session_runner():
extra = {
'result_queue': Queue.Queue(),
'joined': threading.Event(),
'authentication': ACCESS_TOKEN,
}
session = WampSession(ComponentConfig(extra=extra))
certificate = PrivateCertificate.loadPEM(CERT_DATA)
ssl = certificate.options()
runner = ApplicationRunner(url=URL, ssl=ssl)
return session, runner
def main():
session, runner = get_session_runner()
# Monkeypatch this to make it run in a thread, since ApplicationRunner
# calls 'run' without arguments, and we need to skip signal handlers.
def _run_threaded():
return reactor._run_original(installSignalHandlers=False)
reactor._run_original = reactor.run
reactor.run = _run_threaded
t = threading.Thread(target=runner.run, args=(session,))
t.daemon = True
t.start()
orig = FUNC_ARGS['resultTopic']
for i in xrange(100):
FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
start = time.time()
reactor.callInThread(session.do_work, FUNC_ARGS)
result = session.config.extra['result_queue'].get()
end = time.time()
print
time.sleep(1) # minisleep for tcpdump logging delay
print 'Result: %s (%.2fs)' % (result, end - start)
print '\nSleeping\n'
time.sleep(6)
print '\nDone Sleeping\n'
这似乎在根据 Jean-Paul 的回答进行修改后运行良好(将调用移至线程而不是反应堆,使用 callFromThread
):
def main():
session, runner = get_session_runner()
t = threading.Thread(target=threadstuff, args=(session,))
t.start()
runner.run(session)
def threadstuff(session):
orig = FUNC_ARGS['resultTopic']
session.config.extra['joined'].wait()
for i in xrange(100):
FUNC_ARGS['resultTopic'] = orig + ('.%d' % i)
start = time.time()
reactor.callFromThread(session.do_work, FUNC_ARGS)
result = session.config.extra['result_queue'].get()
end = time.time()
print
time.sleep(1) # minisleep for tcpdump logging delay
print 'Result: %s (%.2fs)' % (result, end - start)
print '\nSleeping\n'
time.sleep(6)
print '\nDone Sleeping\n'
reactor.stop()
callInThread
是调度 API,它允许您从反应堆的线程池调用线程中的给定函数。它必须从反应器线程调用。
callFromThread
是调度 API,它允许您在反应器线程中调用给定函数。它通常从非反应器线程调用。
您应该在此处使用 callFromThread
,因为您正在尝试从非 Reactor 线程安排工作。