gpt4 book ai didi

java - 使用spring amqp模板清除rabbitmq队列?

转载 作者:行者123 更新时间:2023-12-02 08:00:36 25 4
gpt4 key购买 nike

我正在使用 Spring Batch Item writer 中的 Spring amqp 模板将消息添加到 RabbitMQ 队列。

public class AmqpAsynchRpcItemWriter<T> implements ItemWriter<T> {

protected String exchange;
protected String routingKey;
protected String queue;
protected String replyQueue;
protected RabbitTemplate template;
BlockingQueue<Object> blockingQueue;


public void onMessage(Object msgContent) {

try {
blockingQueue.put(msgContent);
} catch (InterruptedException e) {
e.printStackTrace();
}


}

@Override
public void write(List<? extends T> items) throws Exception {

for (T item : items) {

Message message = MessageBuilder
.withBody(item.toString().getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setReplyTo(this.replyQueue)
.setCorrelationId(item.toString().getBytes()).build();

template.send(this.exchange, this.routingKey, message);

}

for (T item : items) {

Object msg = blockingQueue.poll(60, TimeUnit.SECONDS);

if (msg instanceof Exception) {
throw (Exception) msg;
} else if (msg == null) {
System.out.println("reply timeout...");
break;
}

}
}

}

消息将在不同的远程服务器上处理。我正在尝试处理这样的用例:如果我的消息处理失败(由于某些异常),步骤执行将停止。

我想清除该队列中的所有剩余消息,以便队列中的剩余消息不应该被消耗和处理,因为它们也会失败。

如果该步骤失败,我的项目编写器将再次对所有消息进行排队,因此我需要清除任何异常时的所有剩余消息。

如何使用 spring amqp 清除队列?

最佳答案

我可以使用

admin.purgeQueue(this.queue, true);

关于java - 使用spring amqp模板清除rabbitmq队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23325743/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com