gpt4 book ai didi

python - 为什么重启消费者后RabbitMQ队列中的消息丢失了?

转载 作者:太空宇宙 更新时间:2023-11-04 01:59:44 25 4
gpt4 key购买 nike

我已经设置了一个 RabbitMQ 消费者,如下所示:

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor

import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
level=logging.INFO,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueueConsumer(object):
"""The consumer class to manage connections to the AMQP server/queue"""

def __init__(self, queue, logger, parameters, thread_id=0):
self.channel = None
self.connection = None
self.queue_name = queue
self.logger = logger
self.consumer_id = 'Thread: %d' % (thread_id,)
self.parameters = pika.ConnectionParameters(**parameters)

def _on_queue_declared(self, frame):
self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
self.logger.info("{} Declared queue...".format(self.consumer_id))
except Exception as e:
self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))

def _on_channel_open(self, channel):
self.channel = channel
try:
self.channel.queue_declare(queue=self.queue_name,
exclusive=False,
durable=True,
auto_delete=False,
callback=self._on_queue_declared)
self.logger.info("{} Opened Channel....".format(self.consumer_id))
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))

def _on_connected(self, connection):
connection.channel(self._on_channel_open)

def consume(self):
try:
self.connection = SelectConnection(self.parameters,
self._on_connected)
self.connection.ioloop.start()
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
self.connection.close()
self.connection.ioloop.start()

def decode(self, body):
try:
_body = body.decode('utf-8')
except AttributeError:
_body = body

return _body

def handle_delivery(self, channel, method, header, body):
try:
start_time = datetime.datetime.now()
_logger.info("Received...")
_logger.info("Content: %s" % body)
req = json.loads(self.decode(body))

# Do something
sleep(randint(10, 100))

time_taken = datetime.datetime.now() - start_time
_logger.info("[{}] Time Taken: {}.{}".format(
req.get("to_num"), time_taken.seconds, time_taken.microseconds))

except Exception as err:
_logger.exception(err)


if __name__ == "__main__":
workers = 3
pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
try:
pool = ThreadPoolExecutor(max_workers=workers)
start = 1
for thread_id in range(start, (workers + start)):
pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)

except Exception as err:
_logger.exception(err)

我也有一个队列发布者,如下所示:

import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
level=logging.DEBUG,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueuePublisherClient(object):

def __init__(self, queue, request):
self.queue = queue
self.response = None
self.channel = None
self.request = request
self.corrId = str(uuid.uuid4())
self.callBackQueue = None
self.connection = None
parameters = pika.ConnectionParameters(host="0.0.0.0")
self.connection = SelectConnection(
parameters, self.on_response_connected
)
self.connection.ioloop.start()

def on_response(self, ch, method, props, body):
if self.corrId == props.correlation_id:
self.response = body
self.connection.close()
self.connection.ioloop.start()

def on_response_connected(self, connection):
_logger.info("Connected...\t(%s)" % self.queue)
self.connection = connection
self.connection.channel(self.on_channel_open)

def on_connected(self, connection):
self.connection = connection
self.connection.channel(self.on_channel_open)

def on_channel_open(self, channel):
# _logger.info("Channel Opened...\t(%s)" % self.queue)
self.channel = channel
self.channel.queue_declare(queue=self.queue,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declared)

def on_queue_declared(self, frame):
self.channel.basic_publish(exchange="",
routing_key=self.queue,
properties=pika.BasicProperties(),
body=str(self.request))
self.connection.close()
_logger.info("Message Published...\t(%s)" % self.queue)


if __name__ == "__main__":
data = {
'text': 'This is a sample text',
'to_num': '+2547xxxxxxxx'
}
count = 10000

for index in range(count):
data['index'] = index
QueuePublisherClient("test_queue", json.dumps(data))

当我向队列发布 10000 条消息并且消费者未启动时,通过 rabbitmqctl list_queues 我能够看到 test_queue 有 10000 条消息。当我启动消费者时,我运行 rabbitmqctl list_queues 并且我看到队列有 0 条消息。但是,消费者仍在消费队列中的消息。问题是,当我在几秒钟后停止消费者然后重新启动它时,我无法恢复我的消息。我怎样才能避免这种情况?

这只是模拟了一个实际情况,消费者进程被monit重启,我遭受消息丢失。

最佳答案

首先,您应该使用最新版本的 Pika。

当您设置no_ack=True(在Pika 1.0 中为auto_ack=True)时,RabbitMQ 认为消息在传递时已确认。这意味着当您停止它时,您的消费者在内存中(或在 TCP 堆栈中)的每条消息都将丢失,因为 RabbitMQ 认为它已被确认。

您应该使用 no_ack=False(默认设置)并在完成工作后在 handle_delivery 中确认消息。请注意,如果您的工作需要很长时间,您应该在另一个线程中进行,以防止阻塞 Pika 的 I/O 循环。

请参阅以下文档:https://www.rabbitmq.com/confirms.html


注意:RabbitMQ 团队监控rabbitmq-users mailing list并且只是偶尔在 StackOverflow 上回答问题。

关于python - 为什么重启消费者后RabbitMQ队列中的消息丢失了?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55974797/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com