gpt4 book ai didi

spring - 如何在 AUTO_ACKNOWLEDGE JMS session 场景中模拟消息重新传递?

转载 作者:行者123 更新时间:2023-12-04 14:49:16 27 4
gpt4 key购买 nike

在下面的测试中,我试图模拟以下场景:

  • 启动消息队列。
  • 设计为在消息处理期间失败的使用者已启动。
  • 产生一条消息。
  • 消费者开始处理消息。
  • 在处理过程中抛出异常以模拟消息处理失败。失败的消费者被停止。
  • 另一个消费者开始意图获取重新传递的消息。

  • 但是我的测试失败了,消息没有重新传递给新的消费者。我会很感激这方面的任何提示。

    MessageProcessingFailureAndReprocessingTest.java
    @ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
    loader=JavaConfigContextLoader.class)
    public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
    testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
    testScenario.stop();
    }

    @Test public void
    should_reprocess_task_after_processing_failure() {
    try {
    Thread.sleep(20*1000);

    assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
    "task-1",
    })));
    } catch (InterruptedException e) {
    fail();
    }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
    @Autowired
    public BrokerService broker;

    @Autowired
    public MockTaskProducer mockTaskProducer;

    @Autowired
    public FailingWorker failingWorker;

    @Autowired
    public SucceedingWorker succeedingWorker;

    @Autowired
    public TaskScheduler scheduler;

    public void start() {
    Date now = new Date();
    scheduler.schedule(new Runnable() {
    public void run() { failingWorker.start(); }
    }, now);

    Date after1Seconds = new Date(now.getTime() + 1*1000);
    scheduler.schedule(new Runnable() {
    public void run() { mockTaskProducer.produceTask(); }
    }, after1Seconds);

    Date after2Seconds = new Date(now.getTime() + 2*1000);
    scheduler.schedule(new Runnable() {
    public void run() {
    failingWorker.stop();
    succeedingWorker.start();
    }
    }, after2Seconds);
    }

    public void stop() throws Exception {
    succeedingWorker.stop();
    broker.stop();
    }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
    "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
    @Autowired
    private ConnectionFactory jmsFactory;

    @Bean
    public FailureReprocessTestScenario testScenario() {
    return new FailureReprocessTestScenario();
    }

    @Bean
    public MockTaskProducer mockTaskProducer() {
    return new MockTaskProducer();
    }

    @Bean
    public FailingWorker failingWorker() {
    TaskListener listener = new TaskListener();
    FailingWorker worker = new FailingWorker(listenerContainer(listener));
    listener.setProcessor(worker);
    return worker;
    }

    @Bean
    public SucceedingWorker succeedingWorker() {
    TaskListener listener = new TaskListener();
    SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
    listener.setProcessor(worker);
    return worker;
    }

    private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
    DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
    listenerContainer.setConnectionFactory(jmsFactory);
    listenerContainer.setDestinationName("tasksQueue");
    listenerContainer.setMessageListener(listener);
    listenerContainer.setAutoStartup(false);
    listenerContainer.initialize();
    return listenerContainer;
    }

    }

    public static class FailingWorker implements TaskProcessor {
    private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

    private final DefaultMessageListenerContainer listenerContainer;

    public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
    this.listenerContainer = listenerContainer;
    }

    public void start() {
    LOG.info("FailingWorker.start()");
    listenerContainer.start();
    }

    public void stop() {
    LOG.info("FailingWorker.stop()");
    listenerContainer.stop();
    }

    @Override
    public void processTask(Object task) {
    LOG.info("FailingWorker.processTask(" + task + ")");
    try {
    Thread.sleep(1*1000);
    throw Throwables.propagate(new Exception("Simulate task processing failure"));
    } catch (InterruptedException e) {
    LOG.log(Level.SEVERE, "Unexpected interruption exception");
    }
    }
    }

    public static class SucceedingWorker implements TaskProcessor {
    private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

    private final DefaultMessageListenerContainer listenerContainer;

    public final List<String> processedTasks;

    public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
    this.listenerContainer = listenerContainer;
    this.processedTasks = new ArrayList<String>();
    }

    public void start() {
    LOG.info("SucceedingWorker.start()");
    listenerContainer.start();
    }

    public void stop() {
    LOG.info("SucceedingWorker.stop()");
    listenerContainer.stop();
    }

    @Override
    public void processTask(Object task) {
    LOG.info("SucceedingWorker.processTask(" + task + ")");
    try {
    TextMessage taskText = (TextMessage) task;
    processedTasks.add(taskText.getText());
    } catch (JMSException e) {
    LOG.log(Level.SEVERE, "Unexpected exception during task processing");
    }
    }
    }

    }

    TaskListener.java
    public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
    processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
    this.processor = processor;
    }

    }

    MockTaskProducer.java
    @Configurable
    public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
    LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

    taskCounter++;

    jmsTemplate.send(destination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    TextMessage message = session.createTextMessage("task-" + taskCounter);
    return message;
    }
    });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
    throws BeansException {
    destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
    }

    最佳答案

    显然是我昨天查看的文档来源 Creating Robust JMS Applications以某种方式误导我(或者我可能理解错误)。尤其是那段话:

    Until a JMS message has been acknowledged, it is not considered to be successfully consumed. The successful consumption of a message ordinarily takes place in three stages.

    1. The client receives the message.
    2. The client processes the message.
    3. The message is acknowledged. Acknowledgment is initiated either by the JMS provider or by the client, depending on the session acknowledgment mode.


    我假设 AUTO_ACKNOWLEDGE 确实如此 - 在监听器方法返回结果后确认消息。但是根据 JMS 规范,它有点不同,Spring 监听器容器正如预期的那样不会尝试改变 JMS 规范的行为。这就是 的javadoc AbstractMessageListenerContainer 不得不说 - 我已经强调了重要的句子:

    The listener container offers the following message acknowledgment options:

    • "sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.
    • "sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.
    • "sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown.
    • "sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of exception thrown.


    所以我的解决方案的关键是 listenerContainer.setSessionTransacted(true);
    我面临的另一个问题是 JMS 提供者不断将失败的消息重新传递回在消息处理过程中失败的同一个使用者。我不知道 JMS 规范是否给出了提供者在这种情况下应该做什么的规定,但对我有用的是使用 listenerContainer.shutdown();为了断开失败的消费者并允许提供者重新传递消息并为另一个消费者提供机会。

    关于spring - 如何在 AUTO_ACKNOWLEDGE JMS session 场景中模拟消息重新传递?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9871069/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com