gpt4 book ai didi

java - 从 JMS MessageListener 发出回滚信号

转载 作者:搜寻专家 更新时间:2023-10-30 19:58:58 25 4
gpt4 key购买 nike

我一直在使用 JMS 和 ActiveMQ。一切都在创造奇迹。我没有使用 spring,我也不会。

javax.jms.MessageListener 接口(interface)只有一个方法,onMessage。在实现中,有可能会抛出异常。如果实际上抛出了异常,那么我说消息没有得到正确处理,需要重试。所以,我需要 ActiveMQ 稍等片刻,然后重试。即我需要抛出的异常来回滚 JMS 事务。

我怎样才能完成这样的行为?

也许在 ActiveMQ 中有一些我找不到的配置。

或者...也许可以取消向消费者注册 MessageListener 并自己使用消息,循环如下:

while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}

在几个线程中,而不是注册监听器。

或者...我可以通过某种方式装饰/AOP/字节操作 MessageListener 来自己完成此操作。

你会走哪条路,为什么?

注意:我无法完全控制MessageListener的代码。

编辑概念验证测试:

@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);

BrokerService brokerService = new BrokerService();

String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);

activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

pooledConnectionFactory.start();

Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {

@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();

{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();

MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));

producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");

consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();

brokerService.stop();

assertEquals(3, atomicInteger.get());
}

最佳答案

如果你想使用 SESSION_TRANSACTED 作为你的确认模式,那么你需要设置一个 RedeliveryPolicy on your Connection/ConnectionFactory . This page on ActiveMQ's website还包含一些有关您可能需要执行的操作的有用信息。

由于您没有使用 Spring,您可以使用类似于以下代码(取自上述链接之一)的内容设置 RedeliveryPolicy:

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

编辑将您的代码片段添加到答案中,下面显示了它如何处理事务。在注释掉 Session.rollback() 方法的情况下尝试这段代码,您会发现使用 SESION_TRANSACTED 和 Session.commit/rollback 可以按预期工作:

@Test
public void test() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);

BrokerService brokerService = new BrokerService();

String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);

activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");

PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

pooledConnectionFactory.start();

Connection connection = pooledConnectionFactory.createConnection();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {

public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
session.commit();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
session.rollback();
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
session.commit();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();

{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();

MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));

producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");

consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();

assertEquals(3, atomicInteger.get());
}

关于java - 从 JMS MessageListener 发出回滚信号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7214086/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com