gpt4 book ai didi

redis - 遗漏消息使用redis的queue-outbound adapter分发任务

转载 作者:可可西里 更新时间:2023-11-01 11:41:54 26 4
gpt4 key购买 nike

我正在使用 s.i. 提供的两个组件。拥有一个有效的分配系统

消息(任务)被发送到这个名为 distribution 的 channel ;分发有一个与通知系统关联的窃听器,因此当消息通过分发时我可以通过 jconsole 或任务控制看到:

  <int:channel id="distribution">
<int:interceptors>
<int:wire-tap channel="distributionPublish"/>
</int:interceptors>
</int:channel>

然后我用redis做队列系统:

  <redis:queue-outbound-channel-adapter
id="toRedis" channel="distribution" queue="Qname"
auto-startup="true" extract-payload="false" />


<!-- a Queue Inbound Channel Adapter is available to 'right pop' messages
from a Redis List. -->
<redis:queue-inbound-channel-adapter
id="fromRedis" channel="execution" queue="Qname"
receive-timeout="1000" recovery-interval="1000" expect-message="true"
auto-startup="true"/>

使用以下配置和 2 个服务器,每个服务器占用 50% 的消息;主要是一台服务器有web服务接口(interface),把消息放入输入队列,然后所有订阅和阻塞的服务器都在获取消息(任务)

但是当我加速系统时,有消息丢失。我不知道为什么,也不知道我能做什么。我已经增加了 redis 池值,但这个问题仍然存在,

我是不是做错了什么,或者我怎样才能在分发组件中实现“重试”?

PD:我有窃听器来确定错误是在分发组件中

更新

也许这个配置可以提供帮助? (还没有测试,当我做的时候会更新)思路是有几个线程去redis

  <int:channel id="distribution">
<int:dispatcher task-executor="DistributionTaskExecutor"/>
<int:interceptors>
<int:wire-tap channel="distributionPublish"/>
</int:interceptors>
</int:channel>
<!-- to handle high demanding we use several threads to go to redis -->
<task:executor id="DistributionTaskExecutor" pool-size="2" />

<redis:queue-outbound-channel-adapter
id="toRedis" channel="distribution" queue="${instance}"
auto-startup="true" extract-payload="false" />

更新:

最后,我正在使用这个配置并且似乎运行良好或者。至少,更好:

 <int:channel id="distribution">
<int:queue capacity="50"/>
</int:channel>

<task:executor id='distributionExecutor' pool-size='25' queue-capacity='25' rejection-policy="CALLER_RUNS"/>

<redis:queue-outbound-channel-adapter
id="toRedis" channel="distribution" queue="${instance}"
auto-startup="true" extract-payload="false">
<int:poller task-executor='distributionExecutor' fixed-delay='500'>
</int:poller>
</redis:queue-outbound-channel-adapter>

最佳答案

为了完成这个问题,我将我的评论移到这里作为答案。

<task:executor/>配置具有非常低的并发性 (2) 和 AbortPolicy默认情况下,由于 rejection 而对 Redis 执行的任务已丢失.

只需要依赖单线程DirectChannel或者确保任务执行器有足够大的线程池并将策略配置为 CallerRunsPolicy .

关于redis - 遗漏消息使用redis的queue-outbound adapter分发任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24557742/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com