当您收到的消息超出您的处理能力时。解决方案是限制内部队列大小:
// Limit to max 300 messages
QueueOptions options = new QueueOptions()
.setMaxInternalQueueSize(300);
RabbitMQClient client = RabbitMQClient.create(vertx, new RabbitMQOptions());
client.basicConsumer("my.queue", options, res -> {
if (res.succeeded()) {
System.out.println("RabbitMQ consumer created !");
RabbitMQConsumer mqConsumer = res.result();
mqConsumer.handler((RabbitMQMessage message) -> {
System.out.println("Got message: " + message.body().toString());
});
} else {
res.cause().printStackTrace();
}
});
问题是当超过内部队列的队列容量时,新消息将被简单地丢弃。
如何处理背压而不丢失任何消息?
我最近使用client.basicQos()
函数来限制rabbit的消息消耗并且它运行顺利。
此函数的第一个参数确定预取消息的数量保留在您的消费者(或整个 channel )中,因此它应该像内部队列大小一样工作,但不会获取和丢弃消息。
我是一名优秀的程序员,十分优秀!