gpt4 book ai didi

RabbitMQ在消费者重新连接之前丢失消息

转载 作者:行者123 更新时间:2023-12-01 07:07:09 26 4
gpt4 key购买 nike

我实现了一个消费者,如果底层连接关闭,它会在一段时间后自动重新连接到代理。我的情况如下:

  • 成功启动RabbitMQ服务器。
  • 成功启动消费者。
  • 发布消息,消费者成功接收。
  • 停止RabbitMQ服务器,消费者会显示异常:

    com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}.



    然后消费者将在重新连接前休眠 60 秒。
  • 再次启动 RabbitMQ 服务器。
  • 发布消息成功,命令'list_queues'的结果为0
  • 60 秒后,消费者再次连接到 RabbitMQ,但是现在收到的消息在步骤#6 中发布。
  • 发布第三条消息,消费者成功接收。

  • 在这种情况下,重新连接之前发布的所有消息都将丢失。
    我还进行了另一个实验。
  • 启动RabbitMQ,并成功发布消息(没有启动消费者进程)。
  • 停止RabbitMQ,然后重新启动它。
  • 启动消费者进程,成功接收步骤#1 发布的消息。

  • 注:消费者的QOS为1。
    我已经研究了 RabbitMQ 几天,据我所知,消费者应该在重新连接之前获得发布的消息。
    请帮助(我基于 windows rabbitMQ 进行了测试)。

    以下是发布者:
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(this.getHost());
    connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel = conn.createChannel();
    // declare a 'topic' type of exchange
    channel.exchangeDeclare(exchangeName, "topic");
    // Content-type "application/octet-stream", deliveryMode 2
    // (persistent), priority zero
    channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
    connection.close();

    消费者如下:
        @Override
    public void consume(final String exchangeName, final String queueName, final String routingKey,
    final int qos) throws IOException, InterruptedException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(this.getHost());

    while (true) {
    Connection connection = null;
    try {
    connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(exchangeName, "topic");
    // declare a durable, non-exclusive, non-autodelete queue.
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    // distribute workload among all consumers, consumer will
    // pre-fetch
    // {qos}
    // messages to local buffer.
    channel.basicQos(qos);

    logger.debug(" [*] Waiting for messages. To exit press CTRL+C");

    QueueingConsumer consumer = new QueueingConsumer(channel);
    // disable auto-ack. If enable auto-ack, RabbitMQ delivers a
    // message to
    // the customer it immediately removes it from memory.
    boolean autoAck = false;
    channel.basicConsume(queueName, autoAck, consumer);

    while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    try {
    RabbitMessageConsumer.this.consumeMessage(delivery);
    }
    catch (Exception e) {
    // the exception shouldn't affect the next message
    logger.info("[IGNORE]" + e.getMessage());
    }
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
    }
    catch (Exception e) {
    logger.warn(e);
    }

    if (autoReconnect) {
    this.releaseConn(connection);
    logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
    + this.reconnectInterval / 1000 + " seconds.");
    Thread.sleep(this.getReconnectInterval());
    }
    else
    break;
    }
    }

    private void releaseConn(Connection conn) {
    try {
    if (conn != null)
    conn.close();
    }
    catch (Exception e) {
    // simply ignore this exception
    }
    }

    由于它是一个“主题”交换,因此在 PUBLISHER 中没有声明任何队列。然而,在第一个测试的第 3 步,持久队列已被声明,并且消息也是持久的。我不明白为什么在重新连接之前消息会丢失。

    最佳答案

    如果发布时没有队列,则消息将丢失。您是否连接到同一个队列,如果是,它是持久的,还是在重新启动 RMQ 服务器后发布消息之前重新创建它?听起来解决方案是以下之一:

  • 使队列持久化并在重启后重新连接到该队列
  • 确保在发布前创建队列。

  • 此外,请确保您重新连接到消费者中的正确队列。 1) 可能是两个解决方案中更好的一个。

    关于RabbitMQ在消费者重新连接之前丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18143984/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com