gpt4 book ai didi

java - 如何在java中使用rabbitmq异步发送消息以将它们排队而不等待spring amqp中每条消息的回复?

转载 作者:行者123 更新时间:2023-11-29 05:24:32 30 4
gpt4 key购买 nike

我正在尝试使用 spring amqp 来使用 rabbitmq,下面是我的配置。

<rabbit:connection-factory id="rabbitConnectionFactory"
port="${rabbitmq.port}" host="${rabbitmq.host}" />

<rabbit:admin connection-factory="rabbitConnectionFactory" />

<rabbit:queue name="${rabbitmq.import.queue}" />

<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}" routing-key="${rabbitmq.import.queue}"/>

<rabbit:listener-container
connection-factory="rabbitConnectionFactory" concurrency="5">
<rabbit:listener queues="${rabbitmq.import.queue}" ref="importMessageListener" method="onMessage" />
</rabbit:listener-container>

这是一个简单的消息监听器类,

public class ImportMessageListener {

@Override
public void onMessage(Message message) {
System.out.println("consumer output: " + message);
return message;
}

}

这是producer(即spring batch的itemWriter),

public class ImportItemWriter<T> implements ItemWriter<T> {

private AmqpTemplate template;

public AmqpTemplate getTemplate() {
return template;
}

public void setTemplate(AmqpTemplate template) {
this.template = template;
}

public void write(List<? extends T> items) throws Exception {
for (T item : items) {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
}
}

}

当我运行我的 spring 批处理作业时,每条消息都被一条接一条地发送和处理,我收到了响应

consumer output: 1
producer output: 1

consumer output: 2
producer output: 2

consumer output: 3
producer output: 3

consumer output: 4
producer output: 4


consumer output: 5
producer output: 5

它应该发送 5 条消息并将它们排队,并且 5 个消费者线程(并发性 = 5)应该并发处理它们并且应该在完成后立即响应

So below should be the outout

consumer output: 1
consumer output: 2
consumer output: 3
consumer output: 4
consumer output: 5

producer output: 1
producer output: 2
producer output: 3
producer output: 4
producer output: 5

我不希望生产者等待第一条消息的回复来排队第二条消息。

我尝试使用 convertAndSend 使其异步(不等待回复)但是我如何在我的 itemWriter 中获得回复消息,就像我可以使用 convertSendAndReceive 一样?

如果我将模板配置更改为

<rabbit:template id="importAmqpTemplate"
connection-factory="rabbitConnectionFactory" queue="${rabbitmq.import.queue}"
routing-key="${rabbitmq.import.queue}" reply-queue="${rabbitmq.import.reply.queue}">
<rabbit:reply-listener/>
</rabbit:template>

如果我使用 template.convertAndSend(item.toString());那我怎样才能得到回复消息呢?

我无法将我自己的消息处理程序附加到此监听器以获取我们可以在消费者端附加的回复消息。对于回复,它采用默认的 RabbitmqTemplate 处理程序。

最佳答案

首先让我解释一下这是怎么回事

您使用同步 sendAndReceive 操作。在发送您的消息之前,它会使用 TemporaryReplyQueue 进行充实,并且发件人(生产者)线程会阻塞以等待来自该 replyQueue 的回复。这就是为什么您的所有消息都按顺序处理的原因。

为了继续,我们需要知道在单向 ItemWriter 生产者中使用该操作的原因。

也许 RabbitTemplate#convertAndSend 对您来说就足够了?

更新

也许你只需要RabbitTemplate.ReturnCallbackconvertAndSend 一起确定您的消息是否已送达?

更新 2

实现您使用 TaskExecutor 并行发送和接收的要求的另一种想法。这样你就不需要等待第一条消息的依赖来发送第二条消息:

final List<Object> replies = new ArrayList<Object>();
ExecutorService executor = Executors.newCachedThreadPool();
for (T item : items) {
executor.execute(new Runnable() {

public void run() {
Object reply = template.convertSendAndReceive(item.toString());
System.out.println("producer output: " + reply);
replies.add(reply);
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

然后您可以对当前项目的所有回复做一些事情。

关于java - 如何在java中使用rabbitmq异步发送消息以将它们排队而不等待spring amqp中每条消息的回复?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23196106/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com