gpt4 book ai didi

java - 如何等待多线程发布订阅 channel 完成

转载 作者:行者123 更新时间:2023-11-30 07:18:48 28 4
gpt4 key购买 nike

我有一个 Spring Integration 项目,我想在其中通过多个操作同时处理一条消息。所以我设置了一个带有 task-executorpublish-subscribe-channel。但是我想在继续之前等待所有处理完成。我该怎么做?

<publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"/>

<channel id="myOutputChannel"/>

<service-activator input-channel="myPubSub" output-channel="myOutputChannel"
ref="beanA" method="blah"/>
<service-activator input-channel="myPubSub" output-channel="myOutputChannel"
ref="beanB" method="blah"/>

<service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>

所以在上面的例子中,我希望我的 afterThreadingProcessorbeanAbeanB 完成它们的工作后只被调用一次。但是,在上面的 afterThreadingProcessor 中将被调用两次。

最佳答案

  1. 添加apply-sequence="true"到发布-订阅 channel (这会向消息添加默认关联数据,包括 correlationIdsequenceSizesequenceNumber,并允许在下游组件上使用默认策略)。

  2. 添加 <aggregator/>之前afterThreadingProcessor并路由两个 <service-activator/> 的输出就这样吧。

  3. 添加 <splitter/>在聚合器之后 - 默认拆分器会将聚合器生成的集合拆分为两条消息。

afterThreadingProcessor将为完成其工​​作的第二个线程上的每条消息调用一次。

您可以通过使用链来简化配置...

<chain input-channel="myOutputChannel">
<aggregator />
<splitter />
<service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>
</chain>

要对最终服务进行一次调用,只需将您的服务更改为使用 Collection<?>而不是添加拆分器。

编辑:

为了在评论 #3 中执行您想要的操作(在原始线程上运行最终服务),这应该可行...

<int:channel id="foo" />
<int:service-activator ref="twoServicesGateway" input-channel="foo"
output-channel="myOutputChannel" />

<int:gateway id="twoServicesGateway" default-request-channel="myPubSub"/>
<int:publish-subscribe-channel id="myPubSub" task-executor="my10ThreadPool"
apply-sequence="true"/>
<int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
ref="beanA" method="blah"/>
<int:service-activator input-channel="myPubSub" output-channel="aggregatorChannel"
ref="beanB" method="blah"/>
<int:aggregator input-channel="aggregatorChannel" />

<int:service-activator id="afterThreadingProcessor" input-channel="myOutputChannel" .../>

在这种情况下,网关封装了另外两个服务和聚合器;默认 service-interface是一个简单的 RequestReplyExchanger .调用线程将等待输出。由于聚合器没有 output-channel框架将回复发送给网关,等待线程将收到它,返回到<service-activator/>。然后将结果发送到最终服务。

你可能想放一个 reply-timeout在网关上,因为默认情况下,它将无限期地等待,并且如果其中一项服务返回空值,将永远不会收到聚合响应。

请注意,我缩进了网关流程只是为了表明它从网关运行,它们不是网关的子元素。

关于java - 如何等待多线程发布订阅 channel 完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15114340/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com