gpt4 book ai didi

python - 如何在 pythonrabbitmq 中检查消息是否在过去 5 秒内未发送

转载 作者:太空宇宙 更新时间:2023-11-03 15:03:12 31 4
gpt4 key购买 nike

你好,我正在尝试使用rabbitmq在python中编写代码。我有一个发送消息的队列,但我必须检查消费者是否在过去 5 秒内发送了消息,如果没有发送,我应该终止该进程。我尝试在互联网上搜索此类功能,但没有相关答案,您能给我推荐一些东西吗?

最佳答案

RabbitMQ 包含一个心跳来检测无响应的对等点/失败的消息

From the docs:

Detecting Dead TCP Connections with Heartbeats

In some types of network failure, packet loss can mean that disrupted TCP connections take a moderately long time (about 11 minutes with default configuration on Linux, for example) to be detected by the operating system. AMQP 0-9-1 offers a heartbeat feature to ensure that the application layer promptly finds out about disrupted connections (and also completely unresponsive peers). Heartbeats also defend against certain network equipment which may terminate "idle" TCP connections.

启用 Hearbeats使用 Java 客户端:

ConnectionFactory cf = new ConnectionFactory();

// set the heartbeat timeout to 5 seconds
cf.setRequestedHeartbeat(5);

与.NET客户端类似:

var cf = new ConnectionFactory();

// set the heartbeat timeout to 5 seconds
cf.RequestedHeartbeat = 5;

希望这有帮助。

(rabbitmq 文档中有更多关于 dead-letter exchanges 的信息,还有关于 nack 和 ack/(neg/pos) 交付/确认 on this page 但配置 Heartbeats 应该可以解决问题。)

编辑:抱歉,还有一个python remote procedure callback example在文档中!它需要'pika' ..错过了!

服务器代码示例:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
n = int(body)

print(" [.] fib(%s)" % n)
response = fib(n)

ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

客户端代码示例:

#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

关于python - 如何在 pythonrabbitmq 中检查消息是否在过去 5 秒内未发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44890908/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com