仅当我使用异步数据库操作时,Twisted 才不会发回数据
Twisted will not send data back only if I use async DB ops
在与 inlineCallbacks 和 yield of twisted/txredisapi 斗争之后,我可以将数据保存到 redis 中。感谢 txredisapi 的作者。现在我遇到了一个新问题,套接字服务器不会发送回客户端 before/after 保存到数据库中。
Twisted 提供如下简单的套接字服务器:
from twisted.internet import protocol, reactor
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data) ### write back
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
reactor.listenTCP(8000, EchoFactory)
recctor.run()
我的代码很相似,只是增加了额外的数据库操作。
#!/usr/bin/env python
import time
import binascii
import txredisapi
from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log
from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf
dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
passwd=conf['DbPassword'],host=conf['DbHost'],\
use_unicode=True,charset=conf['DbCharset'])
def getDataParsed(data):
realtime = None
period = None
self.snrCode = dm.snrToAscii(data[2:7])
realtime = data[7:167] # save it into redis
period = data[167:-2] # save it into SQL
return (snrCode, realtime, period)
class PlainTCP(protocol.Protocol):
def __init__(self, factory):
self.factory = factory
self.factory.numConnections = 0
self.snrCode = None
self.rData = None
self.pData = None
self.err = None
def connectionMade(self):
self.factory.numConnections += 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
self.transport.write("Hello remote\r\n") # it only prints very 5 connections.
def connectionLost(self, reason):
self.factory.numConnections -= 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
@defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)
if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err
self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()
if self.snrCode != None and rDat != None and pDat != None:
res = yield self.saveRealTimeData(rcs, rDat)
res = yield self.savePeriodData(dbpool, pDat, conf)
print "err2:%s"%(err) # debug print to show flow control
@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)
@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
query = "some SQL statement"
yield rc.runQuery(query)
class PlainTCPFactory(protocol.Factory):
def buildProtocol(self, addr):
return PlainTCP(self)
def main():
dmdb = Dmdb()
if not dmdb.detectDb():
print "Please run MySQL RDBS first."
sys.exit()
log.startLogging(sys.stdout)
reactor.listenTCP(8080, PlainTCPFactory())
reactor.run()
if __name__ == "__main__":
main()
还有我的客户端剪辑,这是一个简单的客户端:
def connectSend(host="127.0.0.1",port=8080):
global packet
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
s.sendall(''.join(packet))
data = s.recv(1024)
s.close()
print 'Received', repr(data)
except socket.error, err:
print "Remote socket is not available: %s"%str(err)
sys.exit(1)
当前状态为:
- 如果禁用dataReceived()的@defer.inlineCallbacks和yield操作,connectionMode()和dataReceived()中的self.transport.write()都可以向客户端输出数据。
- 如果我们启用@defer.inlineCallbacks 和 SQL/Redis 的两个 yield DB 操作,那么 connectionMode() 内部的 self.transport.write() 每 5 个连接打印一次,而 dataReceived() 不会向客户端输出任何数据。
- 无论@defer.inlineCallbacks如何,调试打印语句都会打印在日志上。
我被告知 dataReceived() 不应该是 @defer.inlineCallbacks。但是,如果我删除了那个装饰,它不会改变任何东西。
我在想gevent是否可以帮助我摆脱这种不可预测的行为。我被卷成了无尽的龙卷风,旋风.....
请有类似经验的朋友帮帮我。谢谢
通过如下更改函数,代码可以正常工作。
#COMMENT OUT decorator of @defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)
if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err
self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()
if self.snrCode != None and rDat != None and pDat != None:
self.saveRealTimeData(rcs, rDat)
self.savePeriodData(dbpool, pDat, conf)
# Removing yield before DB ops
print "err2:%s"%(err) # debug print to show flow control
@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
print "saveRedis"
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)
@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
print "save SQL"
query = "some SQL statement"
yield rc.runQuery(query)
如果我们保留@defer.inlineCallbacks 并在dataReceived 中yield。连接在第二个数据库操作之前关闭。因此没有数据输出到连接。可能是由 inlineCallbacks 装饰器引起的。
去掉这个,流量控制就简单明了。
但是,如果有两个延迟的数据库操作,我仍然可以理解为什么我不能添加 inlineCallbacks。这次他们不需要延期了?
在与 inlineCallbacks 和 yield of twisted/txredisapi 斗争之后,我可以将数据保存到 redis 中。感谢 txredisapi 的作者。现在我遇到了一个新问题,套接字服务器不会发送回客户端 before/after 保存到数据库中。
Twisted 提供如下简单的套接字服务器:
from twisted.internet import protocol, reactor
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data) ### write back
class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()
reactor.listenTCP(8000, EchoFactory)
recctor.run()
我的代码很相似,只是增加了额外的数据库操作。
#!/usr/bin/env python
import time
import binascii
import txredisapi
from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log
from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf
dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
passwd=conf['DbPassword'],host=conf['DbHost'],\
use_unicode=True,charset=conf['DbCharset'])
def getDataParsed(data):
realtime = None
period = None
self.snrCode = dm.snrToAscii(data[2:7])
realtime = data[7:167] # save it into redis
period = data[167:-2] # save it into SQL
return (snrCode, realtime, period)
class PlainTCP(protocol.Protocol):
def __init__(self, factory):
self.factory = factory
self.factory.numConnections = 0
self.snrCode = None
self.rData = None
self.pData = None
self.err = None
def connectionMade(self):
self.factory.numConnections += 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
self.transport.write("Hello remote\r\n") # it only prints very 5 connections.
def connectionLost(self, reason):
self.factory.numConnections -= 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
@defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)
if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err
self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()
if self.snrCode != None and rDat != None and pDat != None:
res = yield self.saveRealTimeData(rcs, rDat)
res = yield self.savePeriodData(dbpool, pDat, conf)
print "err2:%s"%(err) # debug print to show flow control
@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)
@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
query = "some SQL statement"
yield rc.runQuery(query)
class PlainTCPFactory(protocol.Factory):
def buildProtocol(self, addr):
return PlainTCP(self)
def main():
dmdb = Dmdb()
if not dmdb.detectDb():
print "Please run MySQL RDBS first."
sys.exit()
log.startLogging(sys.stdout)
reactor.listenTCP(8080, PlainTCPFactory())
reactor.run()
if __name__ == "__main__":
main()
还有我的客户端剪辑,这是一个简单的客户端:
def connectSend(host="127.0.0.1",port=8080):
global packet
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
s.sendall(''.join(packet))
data = s.recv(1024)
s.close()
print 'Received', repr(data)
except socket.error, err:
print "Remote socket is not available: %s"%str(err)
sys.exit(1)
当前状态为:
- 如果禁用dataReceived()的@defer.inlineCallbacks和yield操作,connectionMode()和dataReceived()中的self.transport.write()都可以向客户端输出数据。
- 如果我们启用@defer.inlineCallbacks 和 SQL/Redis 的两个 yield DB 操作,那么 connectionMode() 内部的 self.transport.write() 每 5 个连接打印一次,而 dataReceived() 不会向客户端输出任何数据。
- 无论@defer.inlineCallbacks如何,调试打印语句都会打印在日志上。
我被告知 dataReceived() 不应该是 @defer.inlineCallbacks。但是,如果我删除了那个装饰,它不会改变任何东西。
我在想gevent是否可以帮助我摆脱这种不可预测的行为。我被卷成了无尽的龙卷风,旋风.....
请有类似经验的朋友帮帮我。谢谢
通过如下更改函数,代码可以正常工作。
#COMMENT OUT decorator of @defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)
if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err
self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()
if self.snrCode != None and rDat != None and pDat != None:
self.saveRealTimeData(rcs, rDat)
self.savePeriodData(dbpool, pDat, conf)
# Removing yield before DB ops
print "err2:%s"%(err) # debug print to show flow control
@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
print "saveRedis"
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)
@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
print "save SQL"
query = "some SQL statement"
yield rc.runQuery(query)
如果我们保留@defer.inlineCallbacks 并在dataReceived 中yield。连接在第二个数据库操作之前关闭。因此没有数据输出到连接。可能是由 inlineCallbacks 装饰器引起的。
去掉这个,流量控制就简单明了。
但是,如果有两个延迟的数据库操作,我仍然可以理解为什么我不能添加 inlineCallbacks。这次他们不需要延期了?