作者热门文章
- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
这是我目前所知道的(请纠正我):
在 RabbitMQ Java 客户端中,对 channel 的操作抛出 IOException
当出现一般网络故障时(来自代理的格式错误的数据、身份验证失败、错过的心跳)。
对 channel 的操作也可以抛出 ShutdownSignalException
未经检查的异常,通常是 AlreadyClosedException
当我们尝试在 channel /连接关闭后对其执行操作时。
关闭过程发生在 "network failure, internal failure or explicit local shutdown" 事件中(例如,通过 channel.close() 或 connection.close())。关闭事件沿“拓扑”向下传播,从连接 -> channel -> 消费者,当 channel 调用消费者的 handleShutdown()
时方法被调用。
用户还可以添加关闭监听器,在关闭过程完成后调用该监听器。
这是我所缺少的:
MyRabbitMQWrapper
这里。当轮询队列发生异常时,我只是优雅地关闭所有内容并重新启动客户端。当工作人员发生异常时,我也只是记录它并完成工作人员。
class Main {
public static void main(String[] args) {
while(true) {
run();
//Easy way to restart the client, the connection has been
//closed so RabbitMQ will re-queue any un-acked tasks.
log.info("Shutdown occurred, restarting in 5 seconds");
Thread.sleep(5000);
}
}
public void run() {
MyRabbitMQWrapper rw = new MyRabbitMQWrapper("localhost");
try {
rw.connect();
while(!Thread.currentThread().isInterrupted()) {
try {
//Wait for a message on the QueueingConsumer
MyMessage t = rw.getNextMessage();
workerPool.submit(new MyTaskRunnable(rw, t));
} catch (InterruptedException | IOException | ShutdownSignalException e) {
//Handle all AMQP library exceptions by cleaning up and returning
log.warn("Shutting down", e);
workerPool.shutdown();
break;
}
}
} catch (IOException e) {
log.error("Could not connect to broker", e);
} finally {
try {
rw.close();
} catch(IOException e) {
log.info("Could not close connection");
}
}
}
}
class MyTaskRunnable implements Runnable {
....
public void run() {
doStuff();
try {
rw.ack(...);
} catch (IOException | ShutdownSignalException e) {
log.warn("Could not ack task");
}
}
}
最佳答案
您可以查看 Lyra用于从意外的 Connection/Channel/Consumer 关闭中自动恢复。
关于java - RabbitMQ Java 客户端 - 如何明智地处理异常和关闭?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25262449/
我是一名优秀的程序员,十分优秀!