gpt4 book ai didi

redis - 仅当我使用异步数据库操作时,Twisted 才不会发回数据

转载 作者:可可西里 更新时间:2023-11-01 11:21:00 24 4
gpt4 key购买 nike

在处理了 inlineCallbacks 和 twisted/txredisapi 的 yield 之后,我可以将数据保存到 redis 中。感谢 txredisapi 的作者。现在我遇到了一个新问题,在保存到数据库之前/之后,套接字服务器不会发送回客户端。

Twisted 提供如下简单的套接字服务器:

from twisted.internet import protocol, reactor

class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data) ### write back

class EchoFactory(protocol.Factory):
def buildProtocol(self, addr):
return Echo()

reactor.listenTCP(8000, EchoFactory)
recctor.run()

我的代码是类似的,只是有额外的数据库操作。

#!/usr/bin/env python

import time
import binascii
import txredisapi

from twisted.internet import defer
from twisted.internet import protocol, reactor
from twisted.internet.protocol import Factory
from twisted.enterprise import adbapi
from twisted.python import log

from dmpack import Dmpack
from dmdb import Dmdb
from dmconfig import DmConf

dm = Dmpack()
conf = DmConf().loadConf()
rcs = txredisapi.lazyConnection(password=conf['RedisPassword'])
dbpool = adbapi.ConnectionPool("MySQLdb",db=conf['DbName'],user=conf['DbAccount'],\
passwd=conf['DbPassword'],host=conf['DbHost'],\
use_unicode=True,charset=conf['DbCharset'])

def getDataParsed(data):
realtime = None
period = None
self.snrCode = dm.snrToAscii(data[2:7])
realtime = data[7:167] # save it into redis
period = data[167:-2] # save it into SQL
return (snrCode, realtime, period)

class PlainTCP(protocol.Protocol):
def __init__(self, factory):
self.factory = factory
self.factory.numConnections = 0
self.snrCode = None
self.rData = None
self.pData = None
self.err = None

def connectionMade(self):
self.factory.numConnections += 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)
self.transport.write("Hello remote\r\n") # it only prints very 5 connections.

def connectionLost(self, reason):
self.factory.numConnections -= 1
print "Nr. of connections: %d\n" %(self.factory.numConnections)

@defer.inlineCallbacks
def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)

if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err

self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()

if self.snrCode != None and rDat != None and pDat != None:
res = yield self.saveRealTimeData(rcs, rDat)
res = yield self.savePeriodData(dbpool, pDat, conf)

print "err2:%s"%(err) # debug print to show flow control


@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)

@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
query = "some SQL statement"
yield rc.runQuery(query)

class PlainTCPFactory(protocol.Factory):
def buildProtocol(self, addr):
return PlainTCP(self)

def main():
dmdb = Dmdb()
if not dmdb.detectDb():
print "Please run MySQL RDBS first."
sys.exit()

log.startLogging(sys.stdout)

reactor.listenTCP(8080, PlainTCPFactory())
reactor.run()

if __name__ == "__main__":
main()

还有我的客户端剪辑,这是一个简单的客户端:

def connectSend(host="127.0.0.1",port=8080):
global packet
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.connect((host, port))
s.sendall(''.join(packet))
data = s.recv(1024)
s.close()
print 'Received', repr(data)
except socket.error, err:
print "Remote socket is not available: %s"%str(err)
sys.exit(1)

当前状态是:

  • 如果禁用 @defer.inlineCallbacks 和 dataReceived() 的 yield 操作,connectionMode() 和 dataReceived() 内部的 self.transport.write() 都可以向客户端输出数据。
  • 如果我们启用@defer.inlineCallbacks 和 SQL/Redis 的两个 yield DB 操作,则 connectionMode() 中的 self.transport.write() 每 5 个连接打印一次,而 dataReceived() 将不会向客户端输出任何数据。
  • 不管@defer.inlineCallbacks 无论如何,调试打印语句都会打印在日志上。

我被告知 dataReceived() 不应该是 @defer.inlineCallbacks。但是如果我删除那个装饰,它不会改变任何东西。

我在想 gevent 是否可以帮助我摆脱这种不可预测的行为。我被扭曲成无尽的 Tornado ,旋风.....

请哪位有类似经验的帮帮我。谢谢。

最佳答案

通过如下更改函数,代码可以工作。

#COMMENT OUT decorator of @defer.inlineCallbacks

def dataReceived(self, data):
global dbpool, rcs
(self.snrCode,rDat,pDat) = getDataParsed(data)

if self.snrCode == None or rDat == None or pDat == None:
err = "Bad format"
else:
err = "OK"
print "err:%s"%(err) # debug print to show flow control
self.err = err

self.transport.write(self.snrCode)
self.transport.write(self.err)
self.transport.write(rDat)
self.transport.write(pDat)
self.transport.loseConnection()

if self.snrCode != None and rDat != None and pDat != None:
self.saveRealTimeData(rcs, rDat)
self.savePeriodData(dbpool, pDat, conf)
# Removing yield before DB ops

print "err2:%s"%(err) # debug print to show flow control


@defer.inlineCallbacks
def saveRealTimeData(self, rc, dat):
print "saveRedis"
key = "somekey"
val = "somedata"
yield rc.set(key,val)
yield rc.expire(key,30)

@defer.inlineCallbacks
def savePeriodData(self,rc,dat,conf):
print "save SQL"
query = "some SQL statement"
yield rc.runQuery(query)

如果我们在 dataReceived 中保留 @defer.inlineCallbacks 和 yield。连接在第二个数据库操作之前关闭。因此没有数据输出到连接。可能是由 inlineCallbacks 装饰器引起的。

通过删除它,流量控制变得简单明了。

但是,如果有两个延迟的数据库操作,我仍然可以理解为什么我不能添加 inlineCallbacks。这次他们不需要延期?

关于redis - 仅当我使用异步数据库操作时,Twisted 才不会发回数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32709539/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com