gpt4 book ai didi

java - 防止 ActiveMQ 重新连接失败时自动退出

转载 作者:行者123 更新时间:2023-11-29 02:58:18 28 4
gpt4 key购买 nike

我设置了一个小型 spring-boot 应用程序,它连接到 ActiveMQ 上的一个或多个主题,这些主题在启动时在应用程序的 application.properties 文件中设置 - 然后将这些消息发送到数据库。

一切正常,但我在尝试实现故障转移时遇到了一些问题 - 基本上,应用程序将尝试重新连接,但在一定次数的重试后,应用程序进程将自动退出,阻止重试(理想情况下,我希望应用程序永远重试,直到手动终止或 ActiveMQ 再次可用)。我已经尝试将连接 URL 中的连接选项(例如 maxReconnectAttempts)(在 application.properties 中使用 url.options)显式设置为 -1/0/99999 但没有这些似乎是正确的,因为每次的行为都是一样的。通过查看关于 Apache's own reference page 的建议我也希望这种行为也能作为默认行为。

如果有人有任何建议强制应用程序不退出,我将不胜感激!我认为重要的代码部分如下:

@Configuration
public class AmqConfig {

private static final Logger LOG = LogManager.getLogger(AmqConfig.class);
private static final String LOG_PREFIX = "[AmqConfig] ";

private String clientId;

private static ArrayList<String> amqUrls = new ArrayList<>();
private static String amqConnectionUrl;
private static Integer numSubs;
private static ArrayList<String> destinations = new ArrayList<>();

@Autowired
DatabaseService databaseService;

public AmqConfig (@Value("${amq.urls}") String[] amqUrl,
@Value("${amq.options}") String amqOptions,
@Value("${tocCodes}") String[] tocCodes,
@Value("${amq.numSubscribers}") Integer numSubs,
@Value("${clientId}") String clientId) throws UnknownHostException {

Arrays.asList(amqUrl).forEach(url -> {
amqUrls.add("tcp://" + url);
});

String amqServerAddress = "failover:(" + String.join(",", amqUrls) + ")";
String options = Strings.isNullOrEmpty(amqOptions) ? "" : "?" + amqOptions;

this.amqConnectionUrl = amqServerAddress + options;

this.numSubs = Optional.ofNullable(numSubs).orElse(4);
this.clientId = Strings.isNullOrEmpty(clientId) ? InetAddress.getLocalHost().getHostName() : clientId;

String topic = "Consumer." + this.clientId + ".VirtualTopic.Feed";

if (tocCodes.length > 0){
Arrays.asList(tocCodes).forEach(s -> destinations.add(topic + "_" + s));
} else { // no TOC codes = connecting to default feed
destinations.add(topic);
}
}

@Bean
public ActiveMQConnectionFactory connectionFactory() throws JMSException {

LOG.info("{}Connecting to AMQ at {}", LOG_PREFIX, amqConnectionUrl);
LOG.info("{}Using client id {}", LOG_PREFIX, clientId);
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(amqConnectionUrl);

Connection conn = connectionFactory.createConnection();
conn.setClientID(clientId);
conn.setExceptionListener(new AmqExceptionListener());
conn.start();

destinations.forEach(destinationName -> {
try {
for (int i = 0; i < numSubs; i++) {
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(new MessageReceiver(databaseService, destinationName));
}

} catch (JMSException e) {
LOG.error("{}Error setting up queue @ {}", LOG_PREFIX, destinationName);
LOG.error(e.getMessage());
}
});

return connectionFactory;
}
}


public class MessageReceiver implements MessageListener, ExceptionListener {

public static final Logger LOG = LogManager.getLogger(MessageReceiver.class);
private static final String LOG_PREFIX = "[Message Receiver] ";

private DatabaseService databaseService;

public MessageReceiver(DatabaseService databaseService, String destinationName){
this.databaseService = databaseService;
LOG.info("{}Creating MessageReceiver for queue with destination: {}", LOG_PREFIX, destinationName);
}

@Override
public void onMessage(Message message) {

String messageText = null;

if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
try {
messageText = tm.getText();
} catch (JMSException e) {
LOG.error("{} Error getting message from AMQ", e);
}
} else if (message instanceof ActiveMQMessage) {
messageText = message.toString();
} else {
LOG.warn("{}Unrecognised message type, cannot process", LOG_PREFIX);
LOG.warn(message.toString());
}

try {
databaseService.sendMessageNoResponse(messageText);
} catch (Exception e) {
LOG.error("{}Unable to acknowledge message from AMQ. Message: {}", LOG_PREFIX, messageText, e);
}
}
}

public class AmqExceptionListener implements ExceptionListener {

public static final Logger LOG = LogManager.getLogger(AmqExceptionListener.class);
private static final String LOG_PREFIX = "[AmqExceptionListener ] ";

@Override
public void onException(JMSException e){
LOG.error("{}Exception thrown by ActiveMQ", LOG_PREFIX, e);
}
}

我从我的应用程序中获得的控制台输出如下所示(抱歉,因为它没有太大意义)

[2019-12-12 14:43:30.292] [WARN ] Transport (tcp://[address]:61616) failed , attempting to automatically reconnect: java.io.EOFException
[2019-12-12 14:43:51.098] [WARN ] Failed to connect to [tcp://[address]:61616] after: 10 attempt(s) continuing to retry.

Process finished with exit code 0

最佳答案

非常有趣的问题!

配置 maxReconnectAttempts=-1 会导致连接尝试永远重试,但我觉得这里的问题如下:

  1. 您在 App 创建 Bean 时尝试连接到 ActiveMQstartup,如果APP启动时ActiveMQ没有运行,则Bean 创建将永远重试连接尝试,从而导致超时,不让APP启动。
  2. 此外,当 ActiveMQ 中途停止运行时,您不会重新尝试连接,因为它是在@Bean 内部完成的,只会在应用程序启动时发生

因此连接不应该在创建 Bean 时发生,但也许可以在应用程序启动后完成(可能在 @PostConstruct block 内)

这些只是提示,你需要把它向前推进

希望这对您有所帮助!

祝你好运!

关于java - 防止 ActiveMQ 重新连接失败时自动退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59308073/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com