gpt4 book ai didi

Java RabbitMQ Consumer.nextMessage 总是获取相同的消息

转载 作者:行者123 更新时间:2023-12-02 12:16:05 25 4
gpt4 key购买 nike

我们在分布式服务架构中使用带有 Spring Boot 的 Java RabbitMq。一项服务获取 HTTP 请求并将其转发到未知队列进行处理。同时它必须等待另一个队列上的响应才能终止 HTTP 请求。 (这是一个预览请求,由渲染器完成其工作)。

ServiceA(HTTP 接口(interface))和 ServiceB(渲染器)可以有多个实例,因此对于每条预览消息,我们还发送一个唯一的 ID 用作路由 key 。

我在使用 BlockingConsumer 时遇到问题。每当我调用consumer.nextMessage()时,我都会一遍又一遍地收到相同的消息。这是双重奇怪的,对于一个它应该被确认并从队列中删除,对于另一个消费者甚至不应该打扰它,因为我们使用的唯一 ID 不再绑定(bind)到队列。 nextMessage 甚至在渲染器服务完成之前返回并发送回完成消息。

这是简化的设置:

一般

所有服务都使用全局 DirectExchange 来处理所有消息

@Bean
public DirectExchange globalDirectExchange() {
return new DirectExchange(EXCHANGE_NAME, false, true);
}

ServiceA(处理 HTTP 请求):

 private Content requestPreviewByKey(RenderMessage renderMessage, String previewKey) {
String renderDoneRoutingKey= UUID.randomUUID().toString();
renderMessage.setPreviewDoneKey(renderDoneId);
Binding binding = BindingBuilder.bind(previewDoneQueue).to(globalDirectExchange)
.with(renderDoneRoutingKey);
try {
amqpAdmin.declareBinding(binding);
rabbitProducer.sendPreviewRequestToKey(renderMessage, previewKey);
return getContentBlocking();
} catch (Exception e) {
logErrorIfDebug(type, e);
throw new ApiException(BaseErrorCode.COMMUNICATION_ERROR, "Could not render preview");
} finally {
amqpAdmin.removeBinding(binding);
}
}


private Content getContentBlocking() {
BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(rabbitMqConfig.connectionFactory(), new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<>(), AcknowledgeMode.AUTO, true, 1, PREVIEW_DONE_QUEUE);
try {
blockingQueueConsumer.start();
Message message = blockingQueueConsumer.nextMessage(waitForPreviewMs);
if (!StringUtils.isEmpty(message)) {
String result = new String(message.getBody());
return JsonUtils.stringToObject(result, Content.class);
}
throw new ApiException("Could not render preview");
} catch (Exception e) {
logError(e);
throw new ApiException("Could not render preview");
} finally {
blockingQueueConsumer.stop();
}

}

服务B

我将为您节省大部分代码。我的日志显示一切进展顺利,一旦完成,服务就会将正确的消息发送到与初始渲染请求一起发送的 UUID 键。

public void sendPreviewDoneMessage(Content content, String previewDoneKey) {
String message = JsonUtils.objectToString(content);
rabbitTemplate.convertAndSend(globalDirectExchange, previewDoneKey, message);
}

整个事情都有效...一次...真正的问题似乎是消费者的设置。当我使用 nextMessage() 时,为什么我不断从队列中获取相同的(第一条)消息。创建和删除 Bindung 是否不能确保在该实例中只接收绑定(bind)到该routingKey 的消息? nextMessage() 不会确认消息并将其从队列中删除吗?!

非常感谢您的耐心等待,更感谢您提供有用的答案!

最佳答案

BlockingQueueConsumer 并非设计用于直接使用;它是 SimpleMessageListenerContainer 的一个组件,它将在消息被监听器使用后负责确认消息(容器调用 commitIfNecessary)。

直接使用此消费者可能会产生其他意想不到的副作用。

我强烈建议使用监听器容器来消费消息。

如果您只想按需接收消息,请使用 RabbitTemplate receive()receiveAndConvert() 方法。

关于Java RabbitMQ Consumer.nextMessage 总是获取相同的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46177925/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com