gpt4 book ai didi

java - Hibernate 在异步 EJB 应用程序中的奇怪行为。竞赛条件?

转载 作者:行者123 更新时间:2023-12-02 13:34:31 26 4
gpt4 key购买 nike

问题简要描述:

我正在研究 Java EE 应用程序处理来自金融市场的消息。应用程序部署在应用程序服务器上:Wildfly-8.2.0.Final

应用程序中的消息流如下图所示:

MDB1
\
StrategyManager(@Singleton) -> StrategyRunner(@Singleton) -> SomeStrategy(@Singleton)
/
MDB2

异步调用 EJB - SomeStrategy(@Singleton) 正在对定义为 JPA 实体的 JPA 模型执行读取|更新操作:StrategyEntity-< ParamEntity [ParamEntity 与 StrategyEntity 存在 @ManyToOne、FetchType.EAGER、CascadeType.ALL 关系]。

StrategyManagerStrategyRunner是辅助EJB解耦,即数据库操作形成SomeStrategy(@Singleton)中包含的主要业务逻辑。

StrategyEntity 在 SomeStrategySingletonEJB 执行 Read|Update 操作之前刷新并保存。

@Slf4j
@Singleton
@Local
@Startup
public class StrategyRunner {

@EJB
private SomeStrategySingletonEJB someStrategySingletonEJB;

@EJB
private StrategyDao strategyDao;


public void runStrategy(StrategyEntity strategyEntity, OrderBookAggregated orderBookAggregated) {
strategyDao.refresh(strategyEntity);
log.info("StrategyEntity after refresh: {}", startegyEntity);
if (!strategyEntity.isRunning()) return;

someStrategySingletonEJB.updateOnOrderBook(strategyEntity);
log.info("StrategyEntity before save: {}", startegyEntity);
strategyDao.save(strategyEntity);

}

public void runStrategy(StrategyEntity strategyEntity, OrderQueryReport orderQueryReport) {
strategyDao.refresh(strategyEntity);
log.info("StrategyEntity after refresh: {}", startegyEntity);
if (!strategyEntity.isRunning()) return;
someStrategySingletonEJB.updateOnExecutionReport(strategyEntity, orderQueryReport);
log.info("StrategyEntity before save: {}", startsingletoneegyEntity);
strategyDao.save(strategyEntity);
}

}

方法 runStrategy() 是从 MDB 中同时调用的。 Singleton 的默认锁类型是 WRITE 锁,因此方法应该永远不要同时运行。 StrategyEntity是从上层(StrategyManager)的数据库中检索的,请参见DETAIL描述。问题是:有时(非常靠后 - 平均 2000 条消息一次)“保存前的 StrategyEntity”会有所不同“刷新后的StrategyEntity”!它看起来像是某种竞争条件。如果我替换strategyDao(访问RDBMS)使用 EJB @Singleton 实现的简单缓存,问题就消失了。我推断出问题Hibernate操作作为应用程序在用Cache替换数据库层后可以在繁重的工作负载下完美运行。你有什么想法吗?

hibernate 属性:

<persistence-unit name="AlgorithmEnginePU">
<provider>org.hibernate.jpa.HibernatePersistenceProvider</provider>
<jta-data-source>java:jboss/AlgorithmEngineDS</jta-data-source>
<exclude-unlisted-classes>false</exclude-unlisted-classes>
<!--<class>com.main.model.configuration.ExchangeInformation</class>-->
<properties>

<!-- Properties for Hibernate -->
<property name="hibernate.default_schema" value="algorithm_engine"/>
<property name="hibernate.dialect" value="org.hibernate.dialect.PostgreSQLDialect"/>
<property name="hibernate.show_sql" value="false"/>
<property name="hibernate.transaction.jta.platform" value="org.hibernate.service.jta.platform.internal.JBossAppServerJtaPlatform"/>
<!--<property name="hibernate.hbm2ddl.auto" value="create" />-->
</properties>
</persistence-unit>
<小时/>

问题的详细描述:

这是第一个 MDB。当strategyManager被锁定时,接收到的消息不会排队也不会被处理,并且onMessage()返回而不调用任何EJB。请注意,序列化的 OrderBookEntity 只是传输对象,并且仅用作 POJO。

@MessageDriven(name = "OrderBookTReceiver", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/quotationData"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "subscriptionDurability", propertyValue = "NonDurable")})
@DependsOn({"OrderBookManager"})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class OrderBookReceiver implements MessageListener{

@EJB
private StrategyManager strategyManager;

@EJB
private StrategyOrderBookLock strategyLock;

@Override
public void onMessage(Message message) {

if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
try {
Serializable serializable = objectMessage.getObject();

if (serializable instanceof OrderBookEntity) {
OrderBookEntity orderBookEntity = (OrderBookEntity) serializable;

if(strategyLock.tryLock()){
try {
strategyManager.updateOrderBook(orderBookEntity);
}finally{
strategyLock.unlock();
}
}
}
} catch (JMSException e) {
log.error(e.toString());
}


}
}
}

第二个MDB接收其他类型的消息并将它们排队以便在注入(inject)的单例上进行处理遵循默认的WRITE_LOCK:

@Slf4j
@MessageDriven(name = "MessageReceiver", activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/tradeAgentReply"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Topic"),
@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge")})
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED)
public class TradeAgentClientMDB implements MessageListener {

@EJB
private StrategyManager strategyManager;

@Override
public void onMessage(Message message) {
(...)

try {

TradeQueryReport tradeQueryReport = (TradeQueryReport) serializableMessage;

if (tradeQueryReport instanceof OrderQueryReport ) {
OrderQueryReport orderQueryReport = (OrderQueryReport) tradeQueryReport;
strategyManager.updateOrder(orderQueryReport);
}

} catch (JMSException e) {
e.printStackTrace();
}

}
}

这里是 EJB(StrategyRunner @EJB 在简要说明部分中进行了描述):

@Slf4j
@Singleton
@Local
@Startup
public class StrategyManager {

@Inject
private StrategyDao strategyDao;

@EJB
private StrategyRunner strategyRunner;

@Asynchronous
public void updateOrderBook(OrderBookAggregated orderBookAggregated) {

StrategyEntity strategy = strategyDao.findStrategyByExchange(orderBookAggregated.getAccount().getExchangeEntity());

if(strategyEntity == null) return;

strategyRunner.runStrategy(strategyEntity, orderBookAggregated);
}

@Asynchronous
public void updateOrder(OrderQueryReport orderQueryReport) {

StrategyEntity strategy = strategyDao.findStrategyByOrder(orderQueryReport.getClientOrderId());

if(strategy == null) return;

strategyRunner.runStrategy(strategy, orderQueryReport);
}

(...)
}

@Singleton
@Local
@Startup
public class StrategyOrderBookLock {
private java.util.concurrent.locks.Lock updateOrderBookLock = new ReentrantLock();


@Lock(LockType.READ)
public boolean tryLock() {
return updateOrderBookLock.tryLock();
}

@Lock(LockType.READ)
public void unlock() {
updateOrderBookLock.unlock();
}
}

最佳答案

代码本身看起来不错,我找不到任何明显的缺陷。我记得不久前我自己也遇到过类似的情况,日志输出与数据库中的内容不匹配。这是我的情况:

  • 记录语句。这并没有立即记录;相反,收集数据并将日志事件添加到队列中。这意味着日志事件对该对象有硬引用。
  • 插入/更新数据库
  • 交易结束
  • 第二个线程从缓存中获取对象。
  • 对象已修改
  • 另一个线程开始处理日志事件
  • 日志事件已转换为字符串。

由于在最后一步之前修改了对象,因此日志输出是意外的:事务结束后的更改“泄漏”到日志记录中。

解决方案:确保记录器仅获取不可变数据。一个简单的解决方法是

log.info("StrategyEntity after refresh: {}", startegyEntity.toString());

关于java - Hibernate 在异步 EJB 应用程序中的奇怪行为。竞赛条件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27698024/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com