gpt4 book ai didi

java - 在并发交换期间隔离有状态bean的最正确方法

转载 作者:行者123 更新时间:2023-11-30 03:44:47 32 4
gpt4 key购买 nike

假设有一条调用有状态 bean 的路由:

<camel:route id="Concurrently-called-route">
<camel:from uri="direct:concurrentlyCalledRoute"/>
<camel:bean ref="statefullBean" method="setSomeState"/>
<camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

消息可以沿着该路径同时发送,即从并发线程调用 ProducerTemplaterequestBody 方法。因此,如果正在进行两次交换,并且在一次交换期间调用 setSomeState 和在另一次交换期间执行的 setSomeStategetSomeDataDependingOnState 调用之间,就会出现问题。我看到有两种方法可以解决这个问题,每种方法都有一个缺点。

使用SEDA

<camel:route id="Councurrently-called-route">
<camel:from uri="direct:concurrentlyCalledRoute"/>
<camel:to uri="seda:sedaRoute"/>
</camel:route>

<camel:route id="SEDA-route">
<camel:from uri="seda:sedaRoute"/>
<camel:bean ref="statefullBean" method="setSomeState"/>
<camel:bean ref="statefullBean" method="getSomeDataDependingOnState"/>
</camel:route>

在这种情况下,从不同线程发送的消息会聚集在 SEDA 端点的队列中。来自该队列的消息在沿着 SEDA-route 行进时在一个线程中进行处理。因此一条消息的处理不会干扰另一条消息的处理。但是,如果有许多线程向 concurrentlyCalledRoute 发送消息,SEDA-route 将成为瓶颈。如果使用多个线程来处理 seda 队列中的项目,则会再次出现并发调用有状态 bean 的问题。

另一种方法 - 使用自定义范围。

自定义范围

Spring 框架允许实现自定义范围。因此,我们能够实现一个范围,为每次交换存储一个单独的 bean 实例。

public class ExchangeScope implements Scope {

private Map<String, Map<String,Object>> instances = new ConcurrentHashMap<>();

private Map<String,Runnable> destructionCallbacks = new ConcurrentHashMap<>();

private final ThreadLocal<String> currentExchangeId = new ThreadLocal<>();

public void activate(String exchangeId) {
if (!this.instances.containsKey(exchangeId)) {
Map<String, Object> instancesInCurrentExchangeScope = new ConcurrentHashMap<>();
this.instances.put(exchangeId, instancesInCurrentExchangeScope);
}
this.currentExchangeId.set(exchangeId);
}

public void destroy() {
String currentExchangeId = this.currentExchangeId.get();
Map<String,Object> instancesInCurrentExchangeScope = instances.get(currentExchangeId);
if (instancesInCurrentExchangeScope == null)
throw new RuntimeException("ExchangeScope with id = " + currentExchangeId + " doesn't exist");
for (String name : instancesInCurrentExchangeScope.keySet()) {
this.remove(name);
}
instances.remove(currentExchangeId);
this.currentExchangeId.set(null);
}

@Override
public Object get(String name, ObjectFactory<?> objectFactory) {
// selects by name a bean instance from a map storing instances for current exchange
// creates a new bean instance if necessary
}

@Override
public Object remove(String name) {
// removes a bean instance
}

@Override
public void registerDestructionCallback(String name, Runnable callback) {
this.destructionCallbacks.put(name, callback);
}

@Override
public Object resolveContextualObject(String name) {
String currentExchangeId = this.currentExchangeId.get();
if (currentExchangeId == null)
return null;

Map<String,Object> instancesInCurrentExchangeScope = this.instances.get(currentExchangeId);
if (instancesInCurrentExchangeScope == null)
return null;

return instancesInCurrentExchangeScope.get(name);
}

@Override
public String getConversationId() {
return this.currentExchangeId.get();
}
}

现在我们可以注册这个自定义作用域并将 statefullBean 声明为交换作用域:

<bean id="exchangeScope" class="org.my.ExchangeScope"/>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="ExchangeScope" value-ref="exchangeScope"/>
</map>
</property>
</bean>

<bean id="statefullBean" class="org.my.StatefullBean" scope="ExchangeScope"/>

要使用交换范围,我们应该在发送消息之前调用 ExchangeScopeactivate 方法,然后调用 destroy :

this.exchangeScope.activate(exchangeId);
this.producerTemplate.requestBody(request);
this.exchangeScope.destroy(exchangeId);

通过此实现,交换作用域实际上是线程作用域。这是一个缺点。例如,如果在路由中使用多线程拆分器,则它将无法从拆分器创建的线程中调用交换作用域 bean,因为对 bean 的调用将在与启动交换的线程不同的线程中执行。

有什么想法可以解决这些缺点吗?在并发交换期间是否有完全不同的方法来隔离有状态 bean?

最佳答案

另一个需要考虑的替代方案是不要让你的 beans 有状态。您可以将状态数据存储在消息本身而不是 bean 中,因此您的方法将类似于:

public class StatefulBean {
public StateInfo setSomeState(Message msg) {...}

public void getSomeDataDependingOnState(StateInfo stateinfo) {...}
}

关于java - 在并发交换期间隔离有状态bean的最正确方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26013925/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com