gpt4 book ai didi

java - 与Camel并行处理大型SQL表

转载 作者:行者123 更新时间:2023-11-30 05:42:28 25 4
gpt4 key购买 nike

我正在尝试使用Apache Camel从Informix表中每天处理约700万行,但我不知道该如何完成。

我第一次尝试使用非常少的数据集(约5万行)是使用.split(body()).parallelProcessing(),如下所示:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");

from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();


当我在 .bean(QueryTable.class, "queryData").split(body()).parallelProcessing()上用500k行尝试它时,这当然会导致“ OutOfMemory”错误,因为它首先在解析之前尝试缓存查询中的所有数据。我尝试将 fetchSize设置为约100,但出现相同的错误,并且使用 maxRows只会得到我指定的行数,而忽略其余行。

我的下一个尝试是使用诸如 sql-componentjdbc之类的Camel组件之一,并尝试使用Splitter在单独的线程中处理每一行,但我遇到了同样的问题。

sql:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same


jdbc:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()


我的最后一次尝试是将 maxMessagesPerPoll用于sql,将 outputType=StreamList用于jdbc组件,但是不幸的是,前者一次只能处理一行(并且必须如此使用它),而后者却给了我 java.sql.SQLException: Cursor not open异常。

sql:

from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component


jdbc:

.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception


最终目标是能够处理数百万行而不消耗太多内存,以防止出现“ OutOfMemory”错误。我的想法是,如果可能的话,请执行以下操作:


在石英cron-trigger上创建我的查询
获得并分组N个结果
发送一组要处理的结果(在另一个线程中),同时获取另一组
重复直到所有数据都已处理


我知道这个问题与 this one类似,但是答案并不能真正解决我的问题。我还注意到,在sql组件的文档中,它为生产者提供了 outputType=StreamList选项,但是在2.18版和更高版本中实现,而在我使用2.14.1版时。

任何帮助和提示将非常有帮助!

谢谢。

其他一些信息:
Apache Camel版本:2.14.1
数据库:Informix

最佳答案

经过大量研究,更多的反复试验和NotaJD的tips,我找到了可以解决的解决方案(仍在测试中)。实际上就是这两种解决方案,但是它们的执行类型不同。

信息:

为了便于说明,我将使用以下信息:


表有700万条记录(行)
AggregationStrategyImpl用以下内容扩展AggregationStrategy


在交换正文中返回List<Object>
Predicate完成聚合List<Object> >= 50000
聚合超时设置为30000毫秒

CustomThreadPool是Camel的ThreadPoolBuilder类的伪实现:


泳池大小:100
最大池大小:50000
最大队列大小:500
时间单位:MILLISECONDS
KeepAlive时间:30000

两种实现都在自动接线


解决方案1:

from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")


该代码仍将在Quartz cron-timer上运行(每天00:01),但是这次我的 QueryTable.class将获取要执行的正确查询(而不是 SELECT *,我现在指定了我需要的列)并将其设置为交换机构。

.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")


骆驼 jdbc组件将从交换主体获取查询,将 resetAutoCommit设置为false,这样就不会抛出 Cursor not open错误,将输出设置为流化并分流执行,因此我不会一次查询所有记录,而不是一一查询。然后,每个获取的记录将通过 TransformRecord.class转换为适当的POJO。

.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();


这次,我使用 aggregate组件创建记录列表。 aggregationStrategyImpl包含用于聚集以及完成谓词和超时的逻辑,因此,当我达到一定数量的记录(或发生超时)时,该列表将发送到“ direct:start-processing”。

有关此源盟友 blog和Apache Camel Aggregate EIP文档中的聚合实现的更多信息。

from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();


在这里,我拆分了获得的列表,并使用自定义ThreadPool创建了N个线程,以分别分析和处理每个记录。这样,我可以并行处理而不是一个接一个地处理列表。我本可以使用 .split(body()).parallelProcessing(),但是以后默认的ThreadPool设置可能不是最佳的。

有关Apache Camel Threading Model文档, ThreadPool Configuration注释和Red Hat Threading Model文档上ThreadPool实现的更多信息。

解决方案2:

对于此解决方案,基本上是完全相同的执行,但具有以下更改:

// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally


该操作将异步发送要处理的列表,从而允许最多1个其他列表在内存中排队,如果队列已满,则暂停父线程。因此,父线程将返回并收集另一批记录,而不是等待记录列表被处理。这也意味着,如果处理路由尚未完成,新记录将不会被抛出,并且父线程将等待,直到它可以将批处理发送到SEDA内存队列中。

GitHub及其 site中的Apache Camel SEDA Component文档中有关SEDA组件的更多信息。

结论:

对于解决方案1,由于它首先要处理所有数据,然后再从查询中收集更多记录,因此完成该过程所花费的时间会更长,但是由于它是在聚合谓词中进行控制的,因此内存消耗应该少得多。

使用解决方案2时,它应该快得多,因为它在处理前一批数据时正在从查询中收集下一批记录,但是内存消耗将更大,因为它最多可以容纳3个列表:一个正在处理,一个在列表中。在SEDA队列中,以及由父线程收集的最新批处理(在队列已满时暂停)。

我说我仍在测试这些解决方案,因为它可以记录500k记录,但是我仍在为实现该记录的服务器制定最佳的ThreadPool设置。我研究了Java中的线程处理,但是似乎除了系统的体系结构,RAM和反复试验之外,实际上没有什么可比的。

关于java - 与Camel并行处理大型SQL表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55407892/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com