gpt4 book ai didi

java - Apache Camel : No rollback after RuntimeException

转载 作者:行者123 更新时间:2023-11-30 07:09:34 25 4
gpt4 key购买 nike

我对 Camel 的事务管理有疑问。

在我的示例中,我从 ZIP 中提取了一个 XML 文件,通过 JAXB 从 xml 中创建了一个 JPA 实体并将其写入数据库。然后我强制执行 RuntimeException。

我的期望是回滚插入的实体,但它们已经提交。

我在 ZIP 拆分器上放置了一个事务处理调用,以便处理所有包含的文件或不处理。聚合器负责在将元数据写入数据库之前将来自不同文件的元数据组合起来。

谁能解释一下代码中缺少什么或者我对 Camel 事务管理的误解在哪里?

提前致谢

阿德里安

...

private final class AggregationStrategyImplementation implements AggregationStrategy {
DocumentMetaDataContainer documentMetaDataContainer;

public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
documentMetaDataContainer = (DocumentMetaDataContainer) newExchange.getIn()
.getBody();
oldExchange = newExchange;
} else {
String header = String.valueOf(newExchange.getIn().getHeader("CamelFileName"));
System.out.println("aggregating " + header);
documentMetaDataContainer.putFileStreamToSpecificElement(header, newExchange
.getIn().getBody(byte[].class));

if (isDone()) {
oldExchange.getOut().setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS, true);
}
}
return oldExchange;
}

public boolean isDone() {
if (documentMetaDataContainer == null)
return false;
for (DocumentMetaData documentMetaData : documentMetaDataContainer
.getListOfDocumentMetaData()) {
if (documentMetaData.getDocumentFile() == null)
return false;
}
return true;
}
}


public void processImport() throws Exception {
ApplicationContext springContext = new ClassPathXmlApplicationContext(
"classpath:spring.xml");
final CamelContext camelContext = SpringCamelContext.springCamelContext(springContext);

RouteBuilder routeBuilder = new RouteBuilder() {
private static final String inbox = "/Development/ppaFiles";

JAXBContext jaxbContext = JAXBContext.newInstance(new Class[] {
com.business.services.DocumentMetaDataContainer.class
});
DataFormat jaxbDataFormat = new JaxbDataFormat(jaxbContext);

public void configure() {
from("file:"+inbox+"?consumer.delay=1000&noop=true")
.routeId("scanDirectory")
.choice()
.when(header("CamelFileName").regex("badb_(.)*.zip"))
.setHeader("msgId").simple("${header.CamelFileName}_${date:now:S}")
.log("processing zip file, aggregating by ${header.msgId}")
.to("direct:splitZip")
.end();


from("direct:splitZip")
.routeId("splitZip")
.transacted()
.split(new ZipSplitter())
.streaming()
.choice()
.when(header("CamelFileName").regex("(.)*_files.xml")) // Meta File
.to("direct:toDocumentMetaData")
.otherwise() // PDF XML Files
.to("direct:toByteArray")
.end();

from("direct:toByteArray")
.routeId("toByteArray")
.convertBodyTo(byte[].class)
.to("direct:aggregateZipEntries");


from("direct:toDocumentMetaData")
.routeId("toDocumentMetaData")
.split()
// root tag name in xml file
.tokenizeXML("files")
.unmarshal(jaxbDataFormat)
.to("direct:aggregateZipEntries");


from("direct:aggregateZipEntries")
.routeId("aggregateZipEntries")
// force to start with meta data file ('..._files.xml')
.resequence(simple("${header.CamelFileName.endsWith('_files.xml')}"))
.allowDuplicates()
.reverse()
.aggregate(new AggregationStrategyImplementation())
.header("msgId")
.completionTimeout(2000L)
.multicast()
.to("direct:saveDocumentMetaData", "direct:doErrorProcessing");


from("direct:saveDocumentMetaData")
.routeId("saveDocumentMetaData")
.split(simple("${body.listOfDocumentMetaData}"))
.multicast()
.to("jpa://com.business.persistence.entities.DocumentMetaData"+
"?persistenceUnit=persistenceUnit"+
"&consumer.transacted=true"+
"&transactionManager=#transactionManager"+
"&flushOnSend=false")
.log("processDocumentMetaData: ${body.getName()}");


from("direct:doErrorProcessing")
.routeId("doErrorProcessing")
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
throw new RuntimeException("Error");
}
});
}
};

camelContext.addRoutes(routeBuilder);

camelContext.start();
Thread.sleep(10000);
camelContext.stop();
}

...

最佳答案

似乎在“splitZip”路由中创建的事务直到“saveDocumentMetaData”和“doErrorProcessing”中的保存事件才扩展,perhaps due to using an aggregator without a persistent store .这就是为什么在“doErrorProcessing”中抛出的异常不会导致“saveDocumentMetaData”中的回滚。

要在一个事务中包含“saveDocumentMetaData”和“doErrorProcessing”,为多播创建一个新事务:

// ...
.aggregate(new AggregationStrategyImplementation())
.header("msgId")
.completionTimeout(2000L)
.to("direct:persist");
// new transacted route
from("direct:persist")
.routeId("persist")
.transacted()
.multicast()
.to("direct:saveDocumentMetaData", "direct:importBalanceSheet");

关于java - Apache Camel : No rollback after RuntimeException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22913796/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com