gpt4 book ai didi

java - 发送方如何知道接收方在使用 Spring Boot 的 JMS 中不可用?

转载 作者:行者123 更新时间:2023-12-01 16:13:34 24 4
gpt4 key购买 nike

我有一个 Spring Boot 应用程序,有 2 个微服务,这些微服务使用 JMS 和 ActiveMq 进行异步通信。因此,发送方 (ms1) 向接收方 (ms2) 发送一条消息,发送方会将消息放入队列中,如果接收方不可用,则消息将保留在队列中,直到接收方可用。

我想问一下,发送方怎么知道接收方是否可用?我想知道,因为我想使用 Hystrix,如果接收方可用,发送方将显示如下消息:“交易成功完成!”,但如果接收方不可用,发送方将显示其他消息,例如this:“接收器服务当前不可用,消息将添加到队列中,并将在可用时发送到接收器”。

此代码来自发件人服务:

    @HystrixCommand(fallbackMethod="sendMessageToProducerFail")
private ResponseEntity sendMessageToProducer(String jsonStr) {
jmsTemplate.convertAndSend(queue, jsonStr);
return new ResponseEntity("Transaction successfully completed!", HttpStatus.CREATED);
}

private ResponseEntity sendMessageToProducerFail(String jsonStr) {
// "The Receiver service isn't currently availble, the message is added to the queue..."
}

最佳答案

没有简单的方法可以实现这一目标。确定消费者是否愿意带来一些设计选择。此外,对于您的问题,没有现成的解决方案。您可以使用心跳的概念来通知生产者有关消费者状态的信息。

+----------+               +-------------+              +--------+
| |<--Heartbeat---| |---Message--->| |
| Producer | | AMQP Server | |Consumer|
| |----Message--->| |<--Heartbeat--| |
+----------+ +-------------+ +--------+

您的设置看起来有点像这样,在这个生产者(ms1)中,将向 AMQP 服务器发送消息,并使用来自消费者的 Heartbeat 事件,以识别消费者是否还活着。当同一主题有多个生产者/消费者时,这将变得更加棘手。

一旦您拥有多个消费者,就会在同一主题/队列上发送多个心跳。您需要确定其中哪一个是活着的,您还需要考虑消费者的下降/上升。

每个心跳在生成时都可以有时间戳,以便您可以在更细粒度的级别上做出消费者 Activity 的决策。下一个问题是何时发送心跳?当您使用 Spring boot 时,您将无法灵活地在消息监听器之外发送心跳。如果您使用的是 MessageListener 接口(interface),请执行以下操作。

创建心跳发布者类

@Component
class HeartBeatPublisher {
public void registerMessageListener( String listenerName,
String topicOrQueueName,
Long intervals) {
// store this detail in a map
// do lazy init of threads to send heartbeat
}
}

您的消息监听器类想要

@Component
class MyMessageListener implements MessageListener {
@Autowired HeartBitPublisher heartBeatPublisher;
@PostConstruct
public void init(){
// 30 seconds interval
heartBeatPublisher.registerMessageListener("MyMessageListener",
"MyMessageListener",
30*1000 );
}
void onMessage(Message message){
// consume message here
}
}

如果您不使用 MessageListener,您仍然可以发送心跳,在这种情况下,我建议为每个组件添加一个监听器。

@Component
class MyMessageListener{
@Autowired HeartBeatPublisher heartBeatPublisher;
@PostConstruct
public void init(){
// 30 seconds interval
heartBeatPublisher.registerMessageListener("MyMessageListener",
"MyMessageListener",
30*1000 );
}

@RabbitListener(queues="myQueue")
public void onMessage(Object message) {
// Consume message
}
}

在生产者方面,您需要添加一个监听器来使用心跳检查哪些消费者处于 Activity 状态。一旦您知道哪些消费者处于 Activity 状态,您就可以使用它来限制消息发布。

@Component
class HeartBeatListener {
private List<String> queues;
@PostConstruct
public void init(){
// initialize queue and consumer status to inactive
// at certain intervals checks for missing heartbeat
// if you find heartbeats are not being sent that means either
// that consumer has died or there's a delay in heart bit
// publish in that case mark that consumer/topic/queue inactive
}

@RabbitListener(queues="myQueue-HeartBeat")
public void onMessage(Object message) {
// update consumer status
// Record heart beat for a given consumer/topic
}
}

您可以使用Consul来代替执行所有这些操作。 , Zookeeper ,或Etcd将一些作品转移到这些系统中。

关于java - 发送方如何知道接收方在使用 Spring Boot 的 JMS 中不可用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62465956/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com