gpt4 book ai didi

python - Twisted Python 中的另一个生产者/消费者问题

转载 作者:IT王子 更新时间:2023-10-29 06:05:58 26 4
gpt4 key购买 nike

我正在构建一个服务器,它使用 Twisted Python 在 Redis 之上存储键/值数据。服务器通过 HTTP 接收一个 JSON 字典,将其转换为 Python 字典并放入缓冲区。每次存储新数据时,服务器都会安排一项任务,使用 txredis 客户端从缓冲区中弹出一个字典并将每个元组写入 Redis 实例。

class Datastore(Resource):

isLeaf = True

def __init__(self):
self.clientCreator = protocol.ClientCreator(reactor, Redis)
d = self.clientCreator.connectTCP(...)
d.addCallback(self.setRedis)
self.redis = None
self.buffer = deque()


def render_POST(self, request):
try:
task_id = request.requestHeaders.getRawHeaders('x-task-id')[0]
except IndexError:
request.setResponseCode(503)
return '<html><body>Error reading task_id</body></html>'

data = json.loads(request.content.read())
self.buffer.append((task_id, data))
reactor.callLater(0, self.write_on_redis)
return ' '

@defer.inlineCallbacks
def write_on_redis(self):
try:
task_id, dic = self.buffer.pop()
log.msg('Buffer: %s' % len(self.buffer))
except IndexError:
log.msg('buffer empty')
defer.returnValue(1)

m = yield self.redis.sismember('DONE', task_id)
# Simple check
if m == '1':
log.msg('%s already stored' % task_id)
else:
log.msg('%s unpacking' % task_id)
s = yield self.redis.sadd('DONE', task_id)

d = defer.Deferred()
for k, v in dic.iteritems():
k = k.encode()
d.addCallback(self.redis.push, k, v)

d.callback(None)

基本上,我面临着两个不同连接之间的生产者/消费者问题,但我不确定当前的实现是否在 Twisted 范例中运行良好。我已经阅读了 Twisted 中关于生产者/消费者接口(interface)的小文档,但我不确定我是否可以在我的案例中使用它们。欢迎任何批评:经过多年的线程并发,我正试图掌握事件驱动编程。

最佳答案

Twisted 中的生产者和消费者 API,IProducerIConsumer,是关于流量控制的。您在这里似乎没有任何流量控制,您只是将消息从一种协议(protocol)中继到另一种协议(protocol)。

由于没有流量控制,缓冲区只是额外的复杂性。您可以通过将数据直接传递给 write_on_redis 方法来摆脱它。这样 write_on_redis 不需要处理空缓冲区的情况,你不需要资源上的额外属性,你甚至可以摆脱 callLater (尽管即使保留缓冲区也可以这样做。

不过,我不知道这是否能回答您的问题。至于这种方法是否“有效”,以下是我通过阅读代码注意到的事情:

  • 如果数据到达的速度快于 Redis 接受它的速度,您的未完成作业列表可能会变得任意大,从而导致内存不足。这就是流量控制的帮助。
  • 由于 sismember 调用或 sadd 调用没有错误处理,如果其中任何一个失败,您可能会丢失任务,因为您已经将它们从工作中弹出缓冲区。
  • 将推送作为对 Deferred d 的回调也意味着任何失败的推送都将阻止其余数据被推送。它还将 push 返回的 Deferred 的结果(我假设它返回一个 Deferred)作为下一个调用的第一个参数,所以除非 push 或多或少地忽略了它的第一个参数,否则您不会将正确的数据推送到 redis。

如果你想实现流量控制,那么你需要让你的 HTTP 服务器检查 self.buffer 的长度并可能拒绝新任务 - 添加它到 self.buffer 并向客户端返回一些错误代码。您仍然不会使用 IConsumerIProducer,但它们有点相似。

关于python - Twisted Python 中的另一个生产者/消费者问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/5157967/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com