gpt4 book ai didi

java - Spring 批处理 : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:43:33 26 4
gpt4 key购买 nike

使用Spring batch 2.2.1,我配置了一个Spring Batch Job,我用的是这个方法:

配置如下:

  • Tasklet 使用限制为 15 个线程的 ThreadPoolTask​​Executor

  • throttle-limit 等于线程数

  • Chunk 用于:

    • JdbcCursorItemReader 的 1 个同步适配器,根据 Spring Batch 文档的建议,允许多个线程使用它

      You can synchronize the call to read() and as long as the processing and writing is the most expensive part of the chunk your step may still complete much faster than in a single threaded configuration.

    • JdbcCursorItemReader 上的 saveState 为 false

    • 基于 JPA 的自定义 ItemWriter。 请注意,它对一个项目的处理在处理时间方面可能会有所不同,可能需要几毫秒到几秒(> 60 秒)。

    • commit-interval 设置为 1(我知道它可以更好,但这不是问题)

  • 所有 jdbc 池都很好,关于 Spring Batch 文档推荐

由于以下原因,运行批处理会导致非常奇怪和糟糕的结果:

  • 在某个步骤中,如果编写器需要一些时间来处理这些项目,线程池中几乎所有线程最终都什么都不做而不是处理,只有慢速编写器在工作。

查看 Spring Batch 代码,根本原因似乎在这个包中:

  • org/springframework/batch/repeat/support/

这种工作方式是功能还是限制/错误?

如果它是一个功能,那么通过配置使所有线程都不会因长时间处理工作而饿死而不必重写所有内容的方式是什么?

请注意,如果所有项目都花费相同的时间,则一切正常,多线程也可以,但如果其中一个项目处理需要更多时间,那么在缓慢的过程中,多线程几乎毫无用处。

注意我打开这个问题:

最佳答案

正如亚历克斯所说,根据 javadocs 的规定,这种行为似乎是一种契约:

Subclasses just need to provide a method that gets the next result * and one that waits for all the results to be returned from concurrent * processes or threads

看:

TaskExecutorRepeatTemplate#waitForResults

您的另一个选择是使用分区:

  • 一个 TaskExecutorPartitionHandler,它将执行来自 Partitionned ItemReader 的项目,见下文
  • 提供 ItemReader 处理的范围的 Partitioner 实现,参见下面的 ColumnRangePartitioner
  • CustomReader 将使用 Partitioner 填充的内容读取数据,请参阅下面的 myItemReader 配置

Michael Minella 在他的书 Pro Spring Batch 的第 11 章中解释了这一点:

<batch:job id="batchWithPartition">
<batch:step id="step1.master">
<batch:partition partitioner="myPartitioner" handler="partitionHandler"/>
</batch:step>
</batch:job>
<!-- This one will create Paritions of Number of lines/ Grid Size-->
<bean id="myPartitioner" class="....ColumnRangePartitioner"/>
<!-- This one will handle every partition in a Thread -->
<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="multiThreadedTaskExecutor"/>
<property name="step" ref="step1" />
<property name="gridSize" value="10" />
</bean>
<batch:step id="step1">
<batch:tasklet transaction-manager="transactionManager">
<batch:chunk reader="myItemReader"
writer="manipulatableWriterForTests" commit-interval="1"
skip-limit="30000">
<batch:skippable-exception-classes>
<batch:include class="java.lang.Exception" />
</batch:skippable-exception-classes>
</batch:chunk>
</batch:tasklet>
</batch:step>
<!-- scope step is critical here-->
<bean id="myItemReader"
class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource"/>
<property name="sql">
<value>
<![CDATA[
select * from customers where id >= ? and id <= ?
]]>
</value>
</property>
<property name="preparedStatementSetter">
<bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<!-- minValue and maxValue are filled in by Partitioner for each Partition in an ExecutionContext-->
<value>{stepExecutionContext[minValue]}</value>
<value>#{stepExecutionContext[maxValue]}</value>
</list>
</property>
</bean>
</property>
<property name="rowMapper" ref="customerRowMapper"/>
</bean>

分区程序.java:

 package ...;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class ColumnRangePartitioner implements Partitioner {
private String column;
private String table;
public Map<String, ExecutionContext> partition(int gridSize) {
int min = queryForInt("SELECT MIN(" + column + ") from " + table);
int max = queryForInt("SELECT MAX(" + column + ") from " + table);
int targetSize = (max - min) / gridSize;
System.out.println("Our partition size will be " + targetSize);
System.out.println("We will have " + gridSize + " partitions");
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int number = 0;
int start = min;
int end = start + targetSize - 1;
while (start <= max) {
ExecutionContext value = new ExecutionContext();
result.put("partition" + number, value);
if (end >= max) {
end = max;
}
value.putInt("minValue", start);
value.putInt("maxValue", end);
System.out.println("minValue = " + start);
System.out.println("maxValue = " + end);
start += targetSize;
end += targetSize;
number++;
}
System.out.println("We are returning " + result.size() + " partitions");
return result;
}
public void setColumn(String column) {
this.column = column;
}
public void setTable(String table) {
this.table = table;
}
}

关于java - Spring 批处理 : Tasklet with multi threaded executor has very bad performances related to Throttling algorithm,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18262857/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com