- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有 hornetQ 应用程序正在运行。我正在尝试将其移至 Artemis。
我使用jmsTemplate来交换消息。jmsTemplate.sendAndReceive() 适用于服务器和客户端。我为服务器和客户端创建了一个带有自定义 messageListener 的消费者。它适用于 hornetQ,但发送给它们的消息永远不会到达 messageListener。
我是否必须更改 session.createConsumer(tq).setMessageListener(new MyMessageListener) 中的某些内容?
我在服务器日志中得到了这个:
org.apache.activemq.artemis.core.server : AMQ221003: Deploying queue jms.queue.e746ebf4-de2d-4257-84b0-975d94b5536a
org.apache.activemq.artemis.core.server : AMQ222165: No Dead Letter Address configured for queue jms.queue.e746ebf4-de2d-4257-84b0-975d94b5536a in AddressSettings
org.apache.activemq.artemis.core.server : AMQ222166: No Expiry Address configured for queue jms.queue.e746ebf4-de2d-4257-84b0-975d94b5536a in AddressSettings
1-2 分钟后,它会失去连接并请求新的连接。
应用程序属性
spring.artemis.mode=embedded
spring.artemis.embedded.enabled=true
spring.artemis.embedded.queues=connection
定制器
@Component
public class ArtemisCustomizer {
@Bean
public ArtemisConfigurationCustomizer artemisConfigurationCustomizer() {
return new ArtemisConfigurationCustomizer() {
@Override
public void customize(Configuration configuration) {
Map<String, Object> transportProperties = new HashMap<String, Object>();
transportProperties.put(TransportConstants.HOST_PROP_NAME, "0.0.0.0");
transportProperties.put(TransportConstants.PORT_PROP_NAME, port);
// transportProperties.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
// transportProperties.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, keystorePath);
// transportProperties.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, keystorePassword);
// transportProperties.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, truststorePath);
// transportProperties.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, truststorePassword);
Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
acceptors.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), transportProperties));
}
};
}
}
消费者
@Component
public class ConnectionConsumer {
private Map<Long, String> queueMap = new HashMap<>();
@JmsListener(destination = "connection")
public void process(Message message) {
log.info("JMS message received: {}", message);
try {
Client client = agentConnected(message.getBody(AgentConnection.class)); // here it logs the client connected and get the info
if (client != null) {
jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TemporaryQueue tq = session.createTemporaryQueue();
session.createConsumer(tq).setMessageListener(new ClientMessageListener(client.getId(), receivingMessageService));
return session.createObjectMessage(new AgentUseQueue(tq.getQueueName()));
}
});
}
} catch (Exception e) {
log.error("Error processing JMS message:", e);
}
}
消息监听器
public class ClientMessageListener implements MessageListener {
private final long clientId;
private final ReceivingMessages agentService;
@Override
public void onMessage(Message message) {
try {
Object o = message.getBody(Object.class);
log.info("Received from clientId {} the message: {}", clientId, o);
.....
if (o instanceof HeartbeatMessage) {
service.heartbeatReceived(clientId, ((HeartbeatMessage) o).getInstances());
}
.....
} catch (Exception e) {
log.error("Could not interpret JMS message: ", e);
}
}
}
应用程序属性
spring.artemis.mode=native
定制器
@Component
public class ArtemisCustomizer {
@Bean
public ConnectionFactory jmsConnectionFactory() {
Map<String, Object> transportProperties = new HashMap<String, Object>();
transportProperties.put(TransportConstants.HOST_PROP_NAME, ip);
transportProperties.put(TransportConstants.PORT_PROP_NAME, port);
// log.info("SSL enabled: {}", true);
// transportProperties.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
// transportProperties.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, keystorePath);
// transportProperties.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, keystorePassword);
return new ActiveMQConnectionFactory(true, new TransportConfiguration(NettyConnectorFactory.class.getName(), transportProperties));
}
}
连接
@Component
public class ConnectionJms {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Properties properties;
@Autowired
private Outbound outbound;
...
@PostConstruct
public void configure() {
jmsTemplate.setReceiveTimeout(60000);
}
@Scheduled(fixedDelay = 30000, initialDelay = 10000)
public void connect() {
if (properties.getJmsQueueSender() == null || properties.getLastMessageReceived() == null || (System.currentTimeMillis() - properties.getLastMessageReceived()) > 120000) {
try {
Message message = this.jmsTemplate.sendAndReceive(properties.getAgentConnectionQueue(), (session) -> {
JMSContext ctx = jmsTemplate.getConnectionFactory().createContext();
TemporaryQueue tq = ctx.createTemporaryQueue();
properties.setJmsQueueReceiver(tq.getQueueName());
ctx.createConsumer(tq).setMessageListener(new MyMessageListener(inbound, properties));
return session.createObjectMessage(new AgentConnection(properties.getJmsQueueReceiver(), properties.getExternalIpAddress(), properties.getInternalIpAddress()));
});
if (message != null) {
AgentUseQueue auq = message.getBody(AgentUseQueue.class);
log.info("Connected. Use this queue to communicate now: {}", auq);
properties.setJmsQueueSender(auq.getQueueName());
properties.setLastMessageReceived(System.currentTimeMillis());
outbound.heartbeat(...);
} else {
log.info("Did not receive any response. Trying again in 15 seconds.");
}
} catch (Exception e) {
log.error("Error sending agent connection message:", e);
}
} else {
outbound.heartbeat(...);
}
}
}
出站
@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class Outbound {
@Autowired
private Properties properties;
@Autowired
private JmsTemplate jmsTemplate;
... methods ...
public void heartbeat(RunningInstanceState[] states) {
log.info("Sending heartbeat. States: {}", new Object[] {states});
send(new HeartbeatMessage(states));
}
public boolean isConnected() {
return properties.getJmsQueueSender() != null;
}
private void send(Serializable message) {
if (isConnected()) {
try {
jmsTemplate.send(properties.getJmsQueueSender(), session -> session.createObjectMessage(message));
} catch (Exception je) {
log.warn("Error on JMS, reseting queues: {}", je.getMessage());
properties.resetJms();
}
}
}
}
已编辑
我在链接 http://s000.tinyupload.com/index.php?file_id=07555336945447914472 上创建小型 Maven 项目
它正在使用 HornetQ 运行。artemis配置注释在pom.xml、application.properties和Customizers
最佳答案
解决方案是将 TemporaryQueue 更改为 Queue。
// TemporaryQueue tq = session.createTemporaryQueue(); // only works for HornetQ
String clientQueue = UUID.randomUUID().toString();
Queue q = session.createQueue(clientQueue);
关于java - 消息监听器不起作用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44287372/
今天有小伙伴给我留言问到,try{...}catch(){...}是什么意思?它用来干什么? 简单的说 他们是用来捕获异常的 下面我们通过一个例子来详细讲解下
我正在努力提高网站的可访问性,但我不知道如何在页脚中标记社交媒体链接列表。这些链接指向我在 facecook、twitter 等上的帐户。我不想用 role="navigation" 标记这些链接,因
说现在是 6 点,我有一个 Timer 并在 10 点安排了一个 TimerTask。之后,System DateTime 被其他服务(例如 ntp)调整为 9 点钟。我仍然希望我的 TimerTas
就目前而言,这个问题不适合我们的问答形式。我们希望答案得到事实、引用资料或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visit the
我就废话不多说了,大家还是直接看代码吧~ ? 1
Maven系列1 1.什么是Maven? Maven是一个项目管理工具,它包含了一个对象模型。一组标准集合,一个依赖管理系统。和用来运行定义在生命周期阶段中插件目标和逻辑。 核心功能 Mav
我是一名优秀的程序员,十分优秀!