gpt4 book ai didi

python - 将来自 Twisted `enterprise.adbapi` 的查询添加到由 `twistd` 守护程序创建的 react 器循环

转载 作者:太空宇宙 更新时间:2023-11-03 11:34:40 24 4
gpt4 key购买 nike

我在 Twisted .tac 插件中使用 twisted.enterprise.adbapi,发现延迟对象返回用于 aConnectionPool 等函数。除非调用 reactor.(run),否则 runQuery(sqlQuery) 不会触发。如何将查询添加到由 twistd 创建的 react 器循环而不是调用 reactor.run()?它是通用过程还是特定于异步数据库 API 的过程?

编辑 - 附上代码:

from twisted.application import internet, service
from zope.interface import implements
from twisted.web.iweb import IBodyProducer

from twisted.internet import defer, protocol, reactor
from twisted.internet.defer import succeed
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

import json
import base64
from twisted.enterprise import adbapi

class StringProducer(object):
implements(IBodyProducer)

def __init__(self, body):
self.body = body
self.length = len(body)

def startProducing(self, consumer):
consumer.write(self.body)
return succeed(None)

def pauseProducing(self):
pass

def stopProducing(self):
pass

def httpRequest(url, values, headers={}, method='POST'):

agent = Agent(reactor)
d = agent.request(method,
url,
Headers(headers),
StringProducer(values)
)

def handle_response(response):
if response.code == 204:
d = defer.succeed('')
else:
class SimpleReceiver(protocol.Protocol):
def __init__(s, d):
s.buf = ''; s.d = d
def dataReceived(s, data):
s.buf += data
response = json.loads(data)

receipt = response[u'receipt']
if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id":
transactionID = receipt[u'original_transaction_id']
date = receipt[u'original_purchase_date']
purchaseDate = date.strip(' Etc/GMT')
print transactionID
print purchaseDate

dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user', passwd='passwd')
dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate))

def finishInsert(dObject, pool):
print 'inserted!'
pool.close()
dOperation.addCallback(finishInsert, dbpool)

def insertError(dObject):
print 'insert error!'
dOperation.addErrback(insertError)

def connectionLost(s, reason):
s.d.callback(s.buf)

d = defer.Deferred()
response.deliverBody(SimpleReceiver(d))
return d

d.addCallback(handle_response)

class StoreServer(protocol.Protocol):

def dataReceived(self, data):
a = data.split(':delimiter:')

if a[0] == 'addToUserList':
receiptBase64 = base64.standard_b64encode(a[1])
jsonReceipt = json.dumps({'receipt-data':receiptBase64})

httpRequest(
"https://buy.itunes.apple.com/verifyReceipt",
jsonReceipt,
{'Content-Type': ['application/x-www-form-urlencoded']}
)

application = service.Application("My Server")
storeFactory = protocol.Factory()
storeFactory.protocol = StoreServer
tcpStoreServer = internet.TCPServer(30000, storeFactory)
tcpStoreServer.setServiceParent(application)

最佳答案

您的代码为它发出的每个请求创建一个新的 ConnectionPool。新的 ConnectionPool 创建自己的新线程池以在其中运行查询,并且必须建立与数据库的新连接。

这意味着您实际上没有连接池。您只需要创建和使用一次的大量连接。此外,errback insertError 不会关闭池。

这些结合起来意味着对可以同时创建的线程/连接的数量没有限制,除了你的系统对你可以分配多少内存或者你可以打开多少套接字施加的限制。当您遇到这些限制之一时,事情就变得不妙了。

这也意味着每个查询错误都会泄漏一些线程和连接(ConnectionPool 在启动时设置了 3 个线程/连接)。在出现足够多的错误后,您将无法再创建任何线程或连接,因此您将无法再查询您的数据库。您的查询很简单,您可能认为出错的可能性不大,但 MySQL 倾向于随机断开客户端(也许您至少有点意识到这一点,因为您确实添加了errback 报告失败)。

ConnectionPool 的预期用途是创建一个(或两个,或其他较小的固定数字),然后将其重新用于您的所有查询。我不知道这些问题是否与您最初观察到的问题有关,但它们可能是您应该解决的问题。

关于python - 将来自 Twisted `enterprise.adbapi` 的查询添加到由 `twistd` 守护程序创建的 react 器循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7883182/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com