gpt4 book ai didi

python - RPC 调用多个消费者

转载 作者:太空宇宙 更新时间:2023-11-03 15:22:45 25 4
gpt4 key购买 nike

我有一个监听消息的消费者,如果消息流超过了消费者的处理能力,我想启动这个消费者的另一个实例。

但我也希望能够轮询来自消费者的信息,我的想法是我可以使用 RPC 通过使用扇出交换从生产者那里请求此信息,以便所有生产者都获得 RPC 调用.

我的问题首先是这是否可能,其次是否合理?

最佳答案

如果问题是“是否可以向多个服务器发送 RPC 消息?”答案是肯定的。

当您构建 RPC 调用时,您将临时队列附加到消息(通常在 header.reply_to 中,但您也可以使用内部消息字段)。这是 RPC 目标将发布其答案的队列。

当您将 RPC 发送到单个服务器时,您可以在临时队列中收到多个消息:这意味着 RPC 应答可以通过以下方式形成:

  • 来自单一来源的单一信息
  • 来自单一来源的多个消息
  • 来自多个来源的不止一条消息

这种场景下出现的问题是

  • 你什么时候停止听?如果您知道 RPC 服务器的数量,您可以等到每个服务器都向您发送一个答案,否则您必须实现某种形式的超时
  • 您需要追踪答案的来源吗?您可以在消息中添加一些特殊字段以保留此信息。消息顺序也是如此。

只是一些代码来展示如何做到这一点(带有 Pika 库的 Python)。注意,这远非完美:最大的问题是当你得到一个新的答案时你应该重置超时。

    def consume_rpc(self, queue, result_len=1, callback=None, timeout=None, raise_timeout=False):
if timeout is None:
timeout = self.rpc_timeout

result_list = []

def _callback(channel, method, header, body):
print "### Got 1/%s RPC result" %(result_len)
msg = self.encoder.decode(body)
result_dict = {}
result_dict.update(msg['content']['data'])
result_list.append(result_dict)

if callback is not None:
callback(msg)

if len(result_list) == result_len:
print "### All results are here: stopping RPC"
channel.stop_consuming()

def _outoftime():
self.channel.stop_consuming()
raise TimeoutError

if timeout != -1:
print "### Setting timeout %s seconds" %(timeout)
self.conn_broker.add_timeout(timeout, _outoftime)

self.channel.basic_consume(_callback, queue=queue, consumer_tag=queue)

if raise_timeout is True:
print "### Start consuming RPC with raise_timeout"
self.channel.start_consuming()
else:
try:
print "### Start consuming RPC without raise_timeout"
self.channel.start_consuming()
except TimeoutError:
pass

return result_list

关于python - RPC 调用多个消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12407485/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com