gpt4 book ai didi

spring-batch - Spring Batch CompositeItemWriter 如何为委托(delegate)编写者管理事务?

转载 作者:行者123 更新时间:2023-12-03 21:22:39 28 4
gpt4 key购买 nike

在批处理作业步骤配置中,我计划在 writer 中执行 2 次查询,第 1 次查询是更新表 A 中的记录,然后第 2 次查询是再次在表 A 中插入新记录。

到目前为止,我认为 CompositeItemWriter 可以实现我上面的目标,即我需要创建 2 个 JdbcBatchItemWriters,一个用于更新,另一个用于插入。

我的第一个问题是 CompositeItemWriter 是否适合上述要求?

如果是,那就引出关于交易的第二个问题。例如,如果第一次更新成功,第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动提取两个更新?

提前致谢!

最佳答案

My first question is if CompositeItemWriter is a fit for the requirement above?



是的, CompositeItemWriter是要走的路。

If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?



好问题!是的,如果在第一个写入器中更新成功,然后在第二个写入器中插入失败,则所有语句将自动回滚。您需要知道的是事务围绕面向 block 的 tasklet 步骤的执行(因此围绕复合项目编写器的 write 方法)。因此,此方法中所有 sql 语句的执行(在委托(delegate)编写器中执行)将是原子的。

为了说明这个用例,我编写了以下测试:
  • 给定一张表people有两列 idname里面只有一条记录:1,'foo'
  • 让我们假设一个作业读取两条记录( 1,'foo'2,'bar' )并尝试更新 foofoo!!然后插入 2,'bar'在表中。这是通过 CompositeItemWriter 完成的。有两个项目作者:UpdateItemWriterInsertItemWriter
  • 用例是 UpdateItemWriter成功但 InsertItemWriter失败(抛出异常)
  • 预期结果是 foo未更新为 foo!!bar未插入表中(由于InsertItemWriter中的异常,两条sql语句都回滚了)

  • 这是代码(它是独立的,因此您可以尝试一下,看看它是如何工作的,它使用了一个应该在您的类路径中的嵌入式 hsqldb 数据库):
    import java.util.Arrays;
    import java.util.List;
    import javax.sql.DataSource;

    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;

    import org.springframework.batch.core.ExitStatus;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.JobExecution;
    import org.springframework.batch.core.StepExecution;
    import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
    import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
    import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
    import org.springframework.batch.item.ItemReader;
    import org.springframework.batch.item.ItemWriter;
    import org.springframework.batch.item.support.CompositeItemWriter;
    import org.springframework.batch.item.support.ListItemReader;
    import org.springframework.batch.test.JobLauncherTestUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.jdbc.core.JdbcTemplate;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
    import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.test.jdbc.JdbcTestUtils;

    @RunWith(SpringRunner.class)
    @ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
    public class TransactionWithCompositeWriterTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
    jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
    jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
    }

    @Test
    public void testTransactionRollbackWithCompositeWriter() throws Exception {
    // given
    int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
    int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
    int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
    Assert.assertEquals(1, peopleCount);
    Assert.assertEquals(1, fooCount);
    Assert.assertEquals(0, barCount);

    // when
    JobExecution jobExecution = jobLauncherTestUtils.launchJob();

    // then
    Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
    Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
    StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
    Assert.assertEquals(0, stepExecution.getCommitCount());
    Assert.assertEquals(1, stepExecution.getRollbackCount());
    Assert.assertEquals(0, stepExecution.getWriteCount());

    peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
    fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
    barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
    Assert.assertEquals(1, peopleCount); // bar is not inserted
    Assert.assertEquals(0, barCount); // bar is not inserted
    Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

    @Bean
    public DataSource dataSource() {
    return new EmbeddedDatabaseBuilder()
    .setType(EmbeddedDatabaseType.HSQL)
    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
    .build();
    }

    @Bean
    public JdbcTemplate jdbcTemplate(DataSource dataSource) {
    return new JdbcTemplate(dataSource);
    }

    @Bean
    public ItemReader<Person> itemReader() {
    Person foo = new Person(1, "foo");
    Person bar = new Person(2, "bar");
    return new ListItemReader<>(Arrays.asList(foo, bar));
    }

    @Bean
    public ItemWriter<Person> updateItemWriter() {
    return new UpdateItemWriter(dataSource());
    }

    @Bean
    public ItemWriter<Person> insertItemWriter() {
    return new InsertItemWriter(dataSource());
    }

    @Bean
    public ItemWriter<Person> itemWriter() {
    CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
    compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
    return compositeItemWriter;
    }

    @Bean
    public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
    return jobBuilderFactory.get("job")
    .start(stepBuilderFactory
    .get("step").<Person, Person>chunk(2)
    .reader(itemReader())
    .writer(itemWriter())
    .build())
    .build();
    }

    @Bean
    public JobLauncherTestUtils jobLauncherTestUtils() {
    return new JobLauncherTestUtils();
    }
    }

    public static class UpdateItemWriter implements ItemWriter<Person> {

    private JdbcTemplate jdbcTemplate;

    public UpdateItemWriter(DataSource dataSource) {
    this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void write(List<? extends Person> items) {
    for (Person person : items) {
    if ("foo".equalsIgnoreCase(person.getName())) {
    jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
    }
    }
    }
    }

    public static class InsertItemWriter implements ItemWriter<Person> {

    private JdbcTemplate jdbcTemplate;

    public InsertItemWriter(DataSource dataSource) {
    this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    public void write(List<? extends Person> items) {
    for (Person person : items) {
    if ("bar".equalsIgnoreCase(person.getName())) {
    jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
    throw new IllegalStateException("Something went wrong!");
    }
    }
    }
    }

    public static class Person {

    private long id;

    private String name;

    public Person() {
    }

    public Person(long id, String name) {
    this.id = id;
    this.name = name;
    }

    public long getId() {
    return id;
    }

    public void setId(long id) {
    this.id = id;
    }

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }
    }
    }

    我的示例使用自定义项目编写器,但这应该适用于两个 JdbcBatchItemWriter s 也是。

    我希望这有帮助!

    关于spring-batch - Spring Batch CompositeItemWriter 如何为委托(delegate)编写者管理事务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51904498/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com