gpt4 book ai didi

java - spring amqp rabbitmq MessageListener 不工作

转载 作者:行者123 更新时间:2023-11-29 07:48:41 27 4
gpt4 key购买 nike

我正在尝试使用 spring amqp 来使用 rabbitmq,下面是我的配置。

<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" />

<beans:bean id="importExchangeMessageListener"
class="com.stockopedia.batch.foundation.ImportMessageListener" />

<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" />
</rabbit:listener-container>

这是一个简单的消息监听器类,

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class ImportMessageListener implements MessageListener {

@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}

}

这是producer(即spring batch的itemWriter),

public class ImportItemWriter<T> implements ItemWriter<T> {

private AmqpTemplate template;

public AmqpTemplate getTemplate() {
return template;
}

public void setTemplate(AmqpTemplate template) {
this.template = template;
}

public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}

}

当我运行我的 spring 批处理作业时,ImportItemWriter.write 被调用。但是 ImportMessageListener.onMessage 不起作用。它不打印消息。我在控制台上的所有项目都低于输出

producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null
producer output: null

最佳答案

您的消费者没有发送结果...

@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
}

改成简单的POJO;容器的 MessageListenerAdapter 将为您处理转换并发送结果。

@Override
public String handleMessage(String message) {
System.out.println("consumer output: " + message);
return "result";
}

编辑:

您还没有设置任何交换或路由到您的队列。如果您想使用默认交换/路由,请使用...

convertSendAndReceive("", queueName, item.toString());

编辑2:

或者,将模板上的routingKey设置为队列名,就可以使用更简单的方法。

...sendAndReceive() 方法用于请求/回复场景,因此需要阻塞。要异步执行此操作,您必须使用 ...send() 方法之一,并连接您自己的 SimpleListenerContainer 以接收回复;你将不得不做你自己的关联。使用

public void convertAndSend(Object message, MessagePostProcessor postProcessor)

并在您的消息后处理器中,设置 replyTocorrelationId header ...

message.getMessageProperties().setReplyTo("foo");
message.getMessageProperties().setCorrelationId("bar");

或者,自己构建 Message 对象(例如 by using the MessageBuilder )并使用 send 方法...

template.send(MessageBuilder.withBody("foo".getBytes())
.setReplyTo("bar")
.setCorrelationId("baz".getBytes())
.build());

每个请求都需要一个唯一的 correlationId,以便您可以关联响应。

关于java - spring amqp rabbitmq MessageListener 不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23188143/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com