- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。
设置
问题
我的应用程序启动正常并开始处理来自 Amazon SQS 的消息。一段时间后,我看到以下警告
2020-02-01 04:16:21.482 LogLevel=WARN 1 --- [ecutor-thread14] o.s.j.l.DefaultMessageListenerContainer : Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers.
上面的警告被打印多次,最终我看到以下两条INFO消息
2020-02-01 04:17:51.552 LogLevel=INFO 1 --- [ecutor-thread40] c.a.s.javamessaging.SQSMessageConsumer : Shutting down ConsumerPrefetch executor
2020-02-01 04:18:06.640 LogLevel=INFO 1 --- [ecutor-thread40] com.amazon.sqs.javamessaging.SQSSession : Shutting down SessionCallBackScheduler executor
以上 2 条消息将显示多次,并且在某个时刻,SQS 不再使用任何消息。我在日志中没有看到任何其他消息表明存在问题,但我没有从处理程序收到任何消息表明它们正在处理消息(我有 2 个~),并且我可以看到 AWS SQS 队列的消息数量在增长,并且年龄。
~:当我有一个处理程序时,这个确切的代码工作正常,当我添加第二个处理程序时,这个问题就开始了。
配置/代码
我意识到的第一个“警告”是由 ThreadPoolTaskExecutor 的货币引起的,但我无法获得正常工作的配置。这是我当前的 JMS 配置,我尝试了各种级别的最大池大小,除了根据池大小迟早启动警告之外没有任何实际影响
public ThreadPoolTaskExecutor asyncAppConsumerTaskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setThreadGroupName("asyncConsumerTaskExecutor");
taskExecutor.setThreadNamePrefix("asyncConsumerTaskExecutor-thread");
taskExecutor.setCorePoolSize(10);
// Allow the thread pool to grow up to 4 times the core size, evidently not
// having the pool be larger than the max concurrency causes the JMS queue
// to barf on itself with messages like
// "Number of scheduled consumers has dropped below concurrentConsumers limit, probably due to tasks having been rejected. Check your thread pool configuration! Automatic recovery to be triggered by remaining consumers"
taskExecutor.setMaxPoolSize(10 * 4);
taskExecutor.setQueueCapacity(0); // do not queue up messages
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
return taskExecutor;
}
这是我们创建的JMS容器工厂
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SQSConnectionFactory sqsConnectionFactory, ThreadPoolTaskExecutor asyncConsumerTaskExecutor) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(sqsConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
// The JMS processor will start 'concurrency' number of tasks
// and supposedly will increase this to the max of '10 * 3'
factory.setConcurrency(10 + "-" + (10 * 3));
factory.setTaskExecutor(asyncConsumerTaskExecutor);
// Let the task process 100 messages, default appears to be 10
factory.setMaxMessagesPerTask(100);
// Wait up to 5 seconds for a timeout, this keeps the task around a bit longer
factory.setReceiveTimeout(5000L);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
我根据在互联网上找到的内容添加了 setMaxMessagesPerTask 和 setReceiveTimeout 调用,如果没有这些调用,并且在各种设置(50、2500L、25、1000L、等等...)
我们创建一个默认的 SQS 连接工厂
public SQSConnectionFactory sqsConnectionFactory(AmazonSQS amazonSQS) {
return new SQSConnectionFactory(new ProviderConfiguration(), amazonSQS);
}
最后处理程序看起来像这样
@JmsListener(destination = "consumer-event-queue")
public void receiveEvents(String message) throws IOException {
MyEventDTO myEventDTO = jsonObj.readValue(message, MyEventDTO.class);
//messageTask.process(myEventDTO);
}
@JmsListener(destination = "myalert-sqs")
public void receiveAlerts(String message) throws IOException, InterruptedException {
final MyAlertDTO myAlert = jsonObj.readValue(message, MyAlertDTO.class);
myProcessor.addAlertToQueue(myAlert);
}
您可以看到,在第一个函数(receiveEvents)中,我们只是从队列中取出消息并退出,我们还没有为此实现处理代码。第二个函数 (receiveAlerts) 获取消息,myProcessor.addAlertToQueue 函数创建一个可运行对象并将其提交到线程池进行处理在未来的某个时刻。
只有当我们添加receiveAlerts函数时,问题才开始(警告、信息和消费消息失败),之前只有另一个函数存在,我们没有看到这种行为.
更多
这是一个较大项目的一部分,我正在努力将此代码分解为一个较小的测试用例,看看是否可以重复此问题。我将发布后续结果。
同时
我希望这只是一个配置问题,更熟悉此问题的人可以告诉我我做错了什么,或者有人可以提供一些关于如何纠正此问题以使其正常工作的想法和评论。
谢谢!
最佳答案
在与这个问题斗争了一段时间后,我想我终于解决了它。
该问题似乎是由“DefaultJmsListenerContainerFactory”引起的,该工厂使用“@JmsListener”注释为 EACH 方法创建了一个新的“DefaultJmsListenerContainer”。最初编写代码的人认为它只为应用程序调用一次,创建的容器将被重复使用。所以问题有两个方面
希望这可以帮助将来遇到类似问题的其他人......
关于spring - DefaultMessageListenerContainer 停止处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60042938/
我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。 设置 Spring-Boor 2.2.2.RELEASE 云启动器 云启动器-aws spring-jms spring-cloud-d
我有以下配置: // Some attributes 以及以下监听器: public class myListener implements MessageListener
我有一个 DefaultMessageListenerContainer 配置如下: DefaultMessageListenerContainer container = new DefaultMe
我希望这是一个简单的配置问题,但我似乎无法弄清楚它可能是什么。 设置 Spring-Boor 2.2.2.RELEASE 云启动器 云启动器-aws spring-jms spring-cloud-d
我正在将 Spring JMS 与以下上下文 XML 文件一起使用。 我的应用程序是命令行独立的,看起来像这样: public static void main(String[]
是否有可能在 onMessage 方法内部知道 MessageListener 正在监听哪个队列? 我的 Spring-config(其中一部分):
我有一个 DefaultMessageListenerContainer,它(在我看来)没有按比例放大。 Container 被定义为监听一个队列,其中有 100 条消息。 我希望容器可以达到任何长度
我正在努力模拟(使用 Mockito)DefaultMessageListenerContainer (org.springframework.jms.listener.DefaultMessageL
使用DefaultJmsListenerContainerFactory有什么好处在 DefaultMessageListenerContainer ? 如果我直接配置 DMLC,我会通过调用 isR
我已将 Spring DefaultMessageListenerContainer 配置为 ActiveMQ 消费者,使用队列中的消息。我们称之为“Test.Queue”我将此代码部署在 4 台不同
我是 Spring Framework 的新手,我的问题如下: 我想实例化 DefaultMessageListenerContainer以编程方式,我使用的代码是: DefaultMessageLi
对于我当前的项目,我需要使用来自许多目的地(从数百到 20 或 30k)的消息,所有目的地都是主题。目前(对于初始负载测试)所有消息都是在同一台服务器上本地创建的,在线程池中。 我当前的 spring
我有一个要求,我在一个队列中有消息,消息选择器的数量是可配置的。我需要并行处理这些消息的地方。 经过深思熟虑,我发现使用多个 DefaultMessageListenerContainer 会产生很好
如果我使用 DefaultMessageListenerContainer 的 Spring要接收 JMS 消息,即使我设置了 sessionAcknowledgeMode,我也不会重新发送 JMS
我开发了使用 Spring JMS 的项目来接收来自队列的消息。并部署Websphere应用服务器(WAS 7.5)集群环境。一旦部署到服务器中,它就工作正常。后来我更新了记录器信息并部署到服务器中。
我正在使用 spring 集成来监听 ibm mq,我想在 hibernate 模式下部署我的应用程序并在需要的时间启动它。因此,我使用了 DefaultMessageListenerContaine
我在 DefaultMessageListenerContainer 内使用 SimpleAsyncTaskExecutor,并且我想使用 JMX mbean 监视 Activity 线程计数。我创建
我有一个案例,我想在同一个“主”线程中运行 DefaultMessageListenerContainer。现在它使用 SimpleAsyncTaskExecutor 每次收到消息时都会生成新线程。
我已经为 jms 监听器配置了 spring 以使用来自 hornetQ 的消息,如下所示。
我正在使用 Spring JMS 连接到 Websphere MQ 服务器。我实现了 SessionAwareListener 接口(interface)来创建自定义监听器,为业务逻辑重用旧代码。 在
我是一名优秀的程序员,十分优秀!