扭曲的 deferToThread 回调被覆盖

Twisted deferToThread callbacks overwritten

我是 twisted 的新手,正在尝试构建程序,但我总是遇到奇怪的行为

import sys
import json
import random
import time

from twisted.internet import threads, reactor
from twisted.python import log
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory


class DummyService():
    username = ''
    password = ''

    def __init__(self, username=username, password=password):
        print 'Connect to DummyService with user {} and password {}'.format(username, password)

    def slow_api_power(self, x):
        s = random.randint(1, 5)
        time.sleep(s)
        print "I am {} and my power is {}".format(x, x**2)

    def slow_go_to(self, x, lat, lng):
        s = random.randint(1, 5)
        time.sleep(s)
        print "I am {} and I am going to {} {}".format(x, lat, lng)


class DroneProtocol(WebSocketServerProtocol):

    # Persistent protocol state is kept in the factory.
    # Because a new instance of a protocol class is created for each connection,
    # protocols can't contain persistent state, that information must instead be stored in a protocol factory.

    def __init__(self, factory_protocol):
        self.factory = factory_protocol

    def onConnect(self, request):
        print("Client connecting: {0}".format(request.peer))

    def onOpen(self):
        print("WebSocket connection open.")

    def onMessage(self, payload, isBinary):
        if isBinary:
            print("Binary message received: {0} bytes".format(len(payload)))
        else:
            print("Text message received: {0}".format(payload.decode('utf8')))

        msg = json.loads(payload.decode('utf8'))

        if 'start' in msg['command']:
            for x in xrange(1, 4):
                self.factory.drones[x] = {
                    'name': x,
                    'instance': DummyService(username='user_' + str(x), password='password')
                }
                d = threads.deferToThread(self.factory.drones[x]['instance'].slow_api_power, x)
                d.addCallback(lambda ignore: self.factory.drones[x]['instance'].slow_go_to(x, lat=x+2, lng=x+4))


    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {0}".format(reason))


class DroneFactory(WebSocketServerFactory):

    drones = {}

    def buildProtocol(self, addr):
        return DroneProtocol(self)


if __name__ == '__main__':

    log.startLogging(sys.stdout)

    factory = DroneFactory()
    factory.protocol = DroneProtocol
    reactor.listenTCP(9999, DroneFactory())
    reactor.run()

我的是这样的:

2015-01-12 14:48:30-0300 [-] Log opened.
2015-01-12 14:48:30-0300 [-] DroneFactory starting on 9999
2015-01-12 14:48:30-0300 [-] Starting factory <__main__.DroneFactory instance at 0x102ed5dd0>
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] Client connecting: 127.0.0.1:60970
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] WebSocket connection open.
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] Text message received: {"command": "start"}
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] Connect to DummyService with user user_1 and password password
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] Connect to DummyService with user user_2 and password password
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] Connect to DummyService with user user_3 and password password
2015-01-12 14:48:35-0300 [DroneProtocol,0,127.0.0.1] WebSocket connection closed: connection was closed uncleanly (peer dropped the TCP connection without previous WebSocket closing handshake)
2015-01-12 14:48:38-0300 [DroneProtocol,0,127.0.0.1] I am 3 and my power is 9
2015-01-12 14:48:38-0300 [DroneProtocol,0,127.0.0.1]  I am 2 and my power is 4
2015-01-12 14:48:39-0300 [-] I am 3 and I am going to 5 7
2015-01-12 14:48:40-0300 [DroneProtocol,0,127.0.0.1] I am 1 and my power is 1
2015-01-12 14:48:42-0300 [-] I am 3 and I am going to 5 7
2015-01-12 14:48:44-0300 [-] I am 3 and I am going to 5 7

我不明白为什么只执行最后一个回调。我有不同的 Deferred,存储在字典中,为什么它们不分开?我的设计缺陷在哪里?

您误解了作用域和嵌套函数的工作原理。考虑:

>>> def foo(x):
...     print x
... 
>>> L = []
>>> for i in range(3):
...     L.append(lambda: foo(i))
... 
>>> L[0]()
2
>>> L[1]()
2
>>> L[2]()
2
>>> 

使用此代码解决

        if 'start' in msg['command']:
            for x in xrange(1, 4):
                self.factory.drones[x] = {
                    'name': x,
                    'instance': DummyService(username='user_' + str(x), password='password')
                }
                d = threads.deferToThread(self.factory.drones[x]['instance'].slow_api_power, x)
                d.addCallback(lambda ignore, p=x: self.factory.drones[p]['instance'].slow_go_to(p, lat=p+2, lng=p+4))

关键是要跟踪当前的x。 另见 http://docs.python-guide.org/en/latest/writing/gotchas/#late-binding-closures