仅当我使用异步数据库操作时,Twisted 才不会发回数据

Twisted will not send data back only if I use async DB ops

在与 inlineCallbacks 和 yield of twisted/txredisapi 斗争之后,我可以将数据保存到 redis 中。感谢 txredisapi 的作者。现在我遇到了一个新问题,套接字服务器不会发送回客户端 before/after 保存到数据库中。

Twisted 提供如下简单的套接字服务器:

from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
    def dataReceived(self, data): 
        self.transport.write(data) ### write back 

class EchoFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Echo()

reactor.listenTCP(8000, EchoFactory)
recctor.run()

我的代码很相似,只是增加了额外的数据库操作。

#!/usr/bin/env python

import time
import binascii
import txredisapi

from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log

from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf

dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
    passwd=conf['DbPassword'],host=conf['DbHost'],\
    use_unicode=True,charset=conf['DbCharset'])

def getDataParsed(data):
    realtime = None
    period = None
    self.snrCode = dm.snrToAscii(data[2:7])    
    realtime = data[7:167] # save it into redis
    period = data[167:-2] # save it into SQL
    return (snrCode, realtime, period)

class PlainTCP(protocol.Protocol):
    def __init__(self, factory):
        self.factory = factory
        self.factory.numConnections = 0
        self.snrCode = None 
        self.rData = None
        self.pData = None
        self.err = None

    def connectionMade(self):
        self.factory.numConnections += 1
        print "Nr. of connections: %d\n" %(self.factory.numConnections)
        self.transport.write("Hello remote\r\n") # it only prints very 5 connections.

    def connectionLost(self, reason):
        self.factory.numConnections -= 1
        print "Nr. of connections: %d\n" %(self.factory.numConnections)

    @defer.inlineCallbacks
    def dataReceived(self, data):
        global dbpool, rcs
        (self.snrCode,rDat,pDat) = getDataParsed(data)

        if self.snrCode == None or rDat == None or pDat == None:
            err = "Bad format"
        else:
            err = "OK"
        print "err:%s"%(err) # debug print to show flow control
        self.err = err 

        self.transport.write(self.snrCode)
        self.transport.write(self.err)
        self.transport.write(rDat)
        self.transport.write(pDat) 
        self.transport.loseConnection()

        if self.snrCode != None and rDat != None and pDat != None:    
            res = yield self.saveRealTimeData(rcs, rDat)        
            res = yield self.savePeriodData(dbpool, pDat, conf)

        print "err2:%s"%(err)  # debug print to show flow control


    @defer.inlineCallbacks
    def saveRealTimeData(self, rc, dat):
        key = "somekey"
        val = "somedata"
        yield rc.set(key,val)
        yield rc.expire(key,30)

    @defer.inlineCallbacks
    def savePeriodData(self,rc,dat,conf):
        query = "some SQL statement"
        yield rc.runQuery(query)

class PlainTCPFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return PlainTCP(self)

def main():
    dmdb = Dmdb()
    if not dmdb.detectDb():
        print "Please run MySQL RDBS first."
        sys.exit()

    log.startLogging(sys.stdout)

    reactor.listenTCP(8080, PlainTCPFactory())
    reactor.run()

if __name__ == "__main__":
    main()

还有我的客户端剪辑,这是一个简单的客户端:

def connectSend(host="127.0.0.1",port=8080):
    global packet
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        s.connect((host, port))
        s.sendall(''.join(packet))
        data = s.recv(1024)
        s.close()
        print 'Received', repr(data)
    except socket.error, err:
        print "Remote socket is not available: %s"%str(err)
        sys.exit(1)

当前状态为:

我被告知 dataReceived() 不应该是 @defer.inlineCallbacks。但是,如果我删除了那个装饰,它不会改变任何东西。

我在想gevent是否可以帮助我摆脱这种不可预测的行为。我被卷成了无尽的龙卷风,旋风.....

请有类似经验的朋友帮帮我。谢谢

通过如下更改函数,代码可以正常工作。

#COMMENT OUT decorator of @defer.inlineCallbacks

def dataReceived(self, data):
    global dbpool, rcs
    (self.snrCode,rDat,pDat) = getDataParsed(data)

    if self.snrCode == None or rDat == None or pDat == None:
        err = "Bad format"
    else:
        err = "OK"
    print "err:%s"%(err) # debug print to show flow control
    self.err = err 

    self.transport.write(self.snrCode)
    self.transport.write(self.err)
    self.transport.write(rDat)
    self.transport.write(pDat) 
    self.transport.loseConnection()

    if self.snrCode != None and rDat != None and pDat != None:    
        self.saveRealTimeData(rcs, rDat)        
        self.savePeriodData(dbpool, pDat, conf)
        # Removing yield before DB ops

    print "err2:%s"%(err)  # debug print to show flow control


@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
    print "saveRedis"
    key = "somekey"
    val = "somedata"
    yield rc.set(key,val)
    yield rc.expire(key,30)

@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
    print "save SQL"
    query = "some SQL statement"
    yield rc.runQuery(query)

如果我们保留@defer.inlineCallbacks 并在dataReceived 中yield。连接在第二个数据库操作之前关闭。因此没有数据输出到连接。可能是由 inlineCallbacks 装饰器引起的。

去掉这个,流量控制就简单明了。

但是,如果有两个延迟的数据库操作,我仍然可以理解为什么我不能添加 inlineCallbacks。这次他们不需要延期了?