gpt4 book ai didi

java - Spring Integration - 队列 channel +服务激活器轮询器耗尽线程池

转载 作者:行者123 更新时间:2023-11-29 08:55:15 27 4
gpt4 key购买 nike

我有一个简单的 spring 集成应用程序,我试图在其中将任务发布到队列 channel ,然后让工作人员获取任务并执行它。 (来自具有多个并发工作人员的池)。

我发现线程池很快耗尽,任务被拒绝。

这是我的配置:

<int:annotation-config />
<task:annotation-driven executor="executor" scheduler="scheduler"/>
<task:executor id="executor" pool-size="5-20" rejection-policy="CALLER_RUNS" />
<task:scheduler id="scheduler" pool-size="5"/>


<int:gateway service-interface="com.example.MyGateway">
<int:method name="queueForSync" request-channel="worker.channel" />
</int:gateway>
<int:channel id="worker.channel">
<int:queue />
</int:channel>

<bean class="com.example.WorkerBean" id="workerBean" />
<int:service-activator ref="workerBean" method="doWork" input-channel="worker.channel">
<int:poller fixed-delay="50" task-executor="executor" receive-timeout="0" />
</int:service-activator>

这个问题非常与我不久前问过的另一个问题很相似,here .主要区别是我在这里没有使用 AMQP 消息代理,只是内部 spring 消息 channel 。

我没能找到 concurrent-consumer 的类比 Vanilla 泉 channel 中的概念。

此外,我采用了 Gary Russell 建议的配置:

To avoid this, simply set the receive-timeout to 0 on the <poller/>

尽管如此,我还是觉得池子耗尽了。

这个目标的正确配置是什么?

顺便说一句 - 这里的另外两种气味表明我的配置有误:

  • 为什么我在 rejection-policy 时收到被拒绝的异常?是CALLER_RUNS
  • queued tasks = 1000 异常开始出现.鉴于执行器上没有队列容量,队列不应该是无界的吗?

显示的异常堆栈跟踪:

[Mon Dec 2013 17:44:57.172] ERROR [task-scheduler-6] (org.springframework.integration.handler.LoggingHandler:126) - org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@48e83911[Running, pool size = 20, active threads = 20, queued tasks = 1000, completed tasks = 48]] did not accept task: org.springframework.integration.util.ErrorHandlingTaskExecutor$1@a5798e
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:244)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:231)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.springframework.integration.util.ErrorHandlingTaskExecutor$1@a5798e rejected from java.util.concurrent.ThreadPoolExecutor@48e83911[Running, pool size = 20, active threads = 20, queued tasks = 1000, completed tasks = 48]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:241)
... 11 more

最佳答案

最好的猜测是您在上下文中的其他地方有另一个 executor bean。

打开调试日志并查找 ...DefaultListableBeanFactory] ​​Overriding bean definition for bean 'executor'

默认队列容量为 Integer.MAX_VALUE

关于java - Spring Integration - 队列 channel +服务激活器轮询器耗尽线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20605206/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com