gpt4 book ai didi

java - RabbitMQ Java 客户端 - 如何明智地处理异常和关闭?

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:01:45 25 4
gpt4 key购买 nike

这是我目前所知道的(请纠正我):

在 RabbitMQ Java 客户端中,对 channel 的操作抛出 IOException当出现一般网络故障时(来自代理的格式错误的数据、身份验证失败、错过的心跳)。

对 channel 的操作也可以抛出 ShutdownSignalException未经检查的异常,通常是 AlreadyClosedException当我们尝试在 channel /连接关闭后对其执行操作时。

关闭过程发生在 "network failure, internal failure or explicit local shutdown" 事件中(例如,通过 channel.close() 或 connection.close())。关闭事件沿“拓扑”向下传播,从连接 -> channel -> 消费者,当 channel 调用消费者的 handleShutdown() 时方法被调用。

用户还可以添加关闭监听器,在关闭过程完成后调用该监听器。

这是我所缺少的:

  • 既然 IOException 表示网络故障,它是否也发起了关闭请求?
  • 使用自动恢复模式如何影响关机请求?当它尝试重新连接到 channel 时,它是否会导致 channel 操作阻塞,或者仍然会抛出 ShutdownSignalException?

  • 这是我目前处理异常的方式,这是一种明智的方法吗?

    我的设置是我正在轮询 QueueingConsumer 并将任务分派(dispatch)到工作池。 rabbitmq 客户端封装在 MyRabbitMQWrapper这里。当轮询队列发生异常时,我只是优雅地关闭所有内容并重新启动客户端。当工作人员发生异常时,我也只是记录它并完成工作人员。

    我最大的担忧(与问题 1 相关):假设 IOException 发生在 worker 中,那么任务不会被确认。如果关闭没有发生,我现在有一个未确认的任务,它将永远处于不确定状态。

    伪代码:
    class Main {
    public static void main(String[] args) {
    while(true) {
    run();
    //Easy way to restart the client, the connection has been
    //closed so RabbitMQ will re-queue any un-acked tasks.
    log.info("Shutdown occurred, restarting in 5 seconds");
    Thread.sleep(5000);
    }
    }

    public void run() {
    MyRabbitMQWrapper rw = new MyRabbitMQWrapper("localhost");

    try {
    rw.connect();

    while(!Thread.currentThread().isInterrupted()) {
    try {
    //Wait for a message on the QueueingConsumer
    MyMessage t = rw.getNextMessage();
    workerPool.submit(new MyTaskRunnable(rw, t));
    } catch (InterruptedException | IOException | ShutdownSignalException e) {
    //Handle all AMQP library exceptions by cleaning up and returning
    log.warn("Shutting down", e);
    workerPool.shutdown();
    break;
    }
    }
    } catch (IOException e) {
    log.error("Could not connect to broker", e);
    } finally {
    try {
    rw.close();
    } catch(IOException e) {
    log.info("Could not close connection");
    }
    }
    }
    }

    class MyTaskRunnable implements Runnable {
    ....

    public void run() {
    doStuff();
    try {
    rw.ack(...);
    } catch (IOException | ShutdownSignalException e) {
    log.warn("Could not ack task");
    }
    }
    }

    最佳答案

    您可以查看 Lyra用于从意外的 Connection/Channel/Consumer 关闭中自动恢复。

    关于java - RabbitMQ Java 客户端 - 如何明智地处理异常和关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25262449/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com