gpt4 book ai didi

spring - 在 Spring Batch 中,是否可以配置多个 JdbcBatchItemWriter 并行写入?

转载 作者:行者123 更新时间:2023-12-04 18:45:26 33 4
gpt4 key购买 nike

在我的 Spring 批处理作业中,我的项目处理器将项目阅读器读取的对象拆分为七个可变长度的列表。这些列表必须写入数据库中的七个表,并且任何错误(例如数据库因任何原因拒绝记录)都必须导致事务在所有七个表上回滚。

目前,我使用传递给自定义项目编写器的这七个列表创建了一个包装对象。编写器获取所有这些项目,创建自己的七个列表,以便它对项目处理器返回的一批包装对象只有七个批量写入(使用基于 JdbcTemplate 的 DAO)。

我的作者按顺序为这些表中的每一个调用插入函数,我想加快速度。我想知道我是否可以将列表并行写入它们各自的表,以便总执行时间是最长写入的时间。我不能妥协的一个要求是,这必须在单个事务中,如果任何作者有任何异常(exception),都需要回滚。

最佳答案

这是一个使用 TaskExecutor 并在 org.springframework.batch.item.support.CompositeItemWriter 上扩展的简单解决方案.

package de.incompleteco.spring.batch.item.support;

import java.util.List;

import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.core.task.TaskExecutor;
import org.springframework.util.Assert;

import de.incompleteco.spring.domain.SimpleEntity;

public class ParallelCompositeItemWriter extends CompositeItemWriter<SimpleEntity> {

private List<ItemWriter<? super SimpleEntity>> delegates;

private TaskExecutor taskExecutor;

@Override
public void write(final List<? extends SimpleEntity> item) throws Exception {
for (final ItemWriter<? super SimpleEntity> writer : delegates) {
taskExecutor.execute(new Runnable() {
@Override
public void run() {
try {
writer.write(item);
} catch (Throwable t) {
rethrow(t);
}
}

private void rethrow(Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
throw new IllegalStateException(t);
}
});
}//end for
}


public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}

@Override
public void setDelegates(List<ItemWriter<? super SimpleEntity>> delegates) {
this.delegates = delegates;
super.setDelegates(delegates);
}

@Override
public void afterPropertiesSet() throws Exception {
super.afterPropertiesSet();
Assert.notNull(taskExecutor,"Task executor needs to be set");
}



}

一个示例配置看起来像这样;
<batch:job id="simpleJob">
<batch:step id="simpleJob.step1">
<batch:tasklet>
<batch:chunk reader="reader" writer="writer" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>

<bean id="reader" class="org.springframework.batch.item.support.IteratorItemReader">
<constructor-arg ref="itemList"/>
</bean>

<bean id="writer" class="de.incompleteco.spring.batch.item.support.ParallelCompositeItemWriter">
<property name="delegates" ref="writerDelegates"/>
<property name="taskExecutor" ref="writerTaskExecutor"/>
</bean>

<util:list id="writerDelegates">
<bean class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource1"/>
<property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/>
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
</property>
</bean>
<bean class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource2"/>
<property name="sql" value="insert into test_table (idcol,stuff) values (:idCol,:stuff)"/>
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
</property>
</bean>
</util:list>

<util:list id="itemList">
<bean class="de.incompleteco.spring.domain.SimpleEntity">
<constructor-arg value="stuff1"/>
</bean>
<bean class="de.incompleteco.spring.domain.SimpleEntity">
<constructor-arg value="stuff2"/>
</bean>
<bean class="de.incompleteco.spring.domain.SimpleEntity">
<constructor-arg value="stuff3"/>
</bean>
</util:list>

<task:executor id="writerTaskExecutor" pool-size="3"/>


<bean id="dataSource1" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close">
<property name="className" value="org.h2.jdbcx.JdbcDataSource" />
<property name="uniqueName" value="#{T(System).currentTimeMillis()}" />
<property name="allowLocalTransactions" value="true"/>
<property name="maxPoolSize" value="2" />
<property name="driverProperties">
<props>
<prop key="URL">jdbc:h2:mem:a;DB_CLOSE_DELAY=-1</prop>
</props>
</property>
</bean>

<bean id="dataSource2" class="bitronix.tm.resource.jdbc.PoolingDataSource" init-method="init" destroy-method="close">
<property name="className" value="org.h2.jdbcx.JdbcDataSource" />
<property name="uniqueName" value="#{T(System).currentTimeMillis()}" />
<property name="allowLocalTransactions" value="true"/>
<property name="maxPoolSize" value="2" />
<property name="driverProperties">
<props>
<prop key="URL">jdbc:h2:mem:b;DB_CLOSE_DELAY=-1</prop>
</props>
</property>
</bean>

<jdbc:initialize-database data-source="dataSource1">
<jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:initialize-database>

<jdbc:initialize-database data-source="dataSource2">
<jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:initialize-database>
<!-- XA transaction -->

<bean id="btmConfig" factory-method="getConfiguration" class="bitronix.tm.TransactionManagerServices"/>

<bean id="BitronixTransactionManager" factory-method="getTransactionManager"
class="bitronix.tm.TransactionManagerServices" depends-on="btmConfig" destroy-method="shutdown" />

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="BitronixTransactionManager" />
<property name="userTransaction" ref="BitronixTransactionManager" />
</bean>

此示例使用以下内容;
  • Bitronix JTA 支持跨多个数据库的事务
  • 一个非常简单的模型,将一个简单的实体转化为一个简单的 jdbc 记录

  • (数据库中的东西很粗糙,只是一个例子)

    关于spring - 在 Spring Batch 中,是否可以配置多个 JdbcBatchItemWriter 并行写入?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15030031/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com