Spring Batch CompositeItemWriter 如何管理委托编写器的事务?

How does Spring Batch CompositeItemWriter manage transaction for delegate writers?

在batch job step configuration中,我计划在writer中执行2个查询,第一个查询是更新table A中的记录,然后第二个查询是在[=21中插入新记录=]又是A.

至此我认为CompositeItemWriter可以实现我上面的目标,即我需要创建2个JdbcBatchItemWriters,一个用于更新,一个用于插入。

我的第一个问题是 CompositeItemWriter 是否符合上述要求?

如果是,则引出第二个交易问题。例如,如果第一次更新成功,而第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动拉取两个更新?

提前致谢!

My first question is if CompositeItemWriter is a fit for the requirement above?

是的,CompositeItemWriter 是正确的选择。

If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?

问得好!是的,如果第一个写入器更新成功,然后第二个写入器插入失败,所有语句将自动回滚。您需要知道的是事务是围绕面向块的 tasklet 步骤的执行(因此围绕复合项编写器的 write 方法)。因此,此方法中所有 sql 语句的执行(在委托编写器中执行)将是原子的。

为了说明这个用例,我编写了以下测试:

  • 给定一个包含两列 idname 的 table people,其中只有一条记录:1,'foo'
  • 让我们想象一个作业读取两条记录 (1,'foo', 2,'bar') 并尝试将 foo 更新为 foo!! 然后插入 2,'bar' table。这是通过 CompositeItemWriter 和两个项目编写器完成的:UpdateItemWriterInsertItemWriter
  • 用例是 UpdateItemWriter 成功但 InsertItemWriter 失败(通过抛出异常)
  • 预期的结果是 foo 没有更新为 foo!! 并且 bar 没有插入 table (两个 sql 语句都被滚动由于 InsertItemWriter)
  • 中的异常返回

这是代码(它是独立的,所以你可以试试看它是如何工作的,它使用一个嵌入式 hsqldb 数据库,它应该在你的类路径中):

import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    @Before
    public void setUp() {
        jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
        jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
    }

    @Test
    public void testTransactionRollbackWithCompositeWriter() throws Exception {
        // given
        int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount);
        Assert.assertEquals(1, fooCount);
        Assert.assertEquals(0, barCount);

        // when
        JobExecution jobExecution = jobLauncherTestUtils.launchJob();

        // then
        Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
        Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
        StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
        Assert.assertEquals(0, stepExecution.getCommitCount());
        Assert.assertEquals(1, stepExecution.getRollbackCount());
        Assert.assertEquals(0, stepExecution.getWriteCount());

        peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
        fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
        barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
        Assert.assertEquals(1, peopleCount); // bar is not inserted
        Assert.assertEquals(0, barCount); // bar is not inserted
        Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
    }

    @Configuration
    @EnableBatchProcessing
    public static class JobConfiguration {

        @Bean
        public DataSource dataSource() {
            return new EmbeddedDatabaseBuilder()
                    .setType(EmbeddedDatabaseType.HSQL)
                    .addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
                    .addScript("/org/springframework/batch/core/schema-hsqldb.sql")
                    .build();
        }

        @Bean
        public JdbcTemplate jdbcTemplate(DataSource dataSource) {
            return new JdbcTemplate(dataSource);
        }

        @Bean
        public ItemReader<Person> itemReader() {
            Person foo = new Person(1, "foo");
            Person bar = new Person(2, "bar");
            return new ListItemReader<>(Arrays.asList(foo, bar));
        }

        @Bean
        public ItemWriter<Person> updateItemWriter() {
            return new UpdateItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> insertItemWriter() {
            return new InsertItemWriter(dataSource());
        }

        @Bean
        public ItemWriter<Person> itemWriter() {
            CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
            compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
            return compositeItemWriter;
        }

        @Bean
        public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
            return jobBuilderFactory.get("job")
                    .start(stepBuilderFactory
                            .get("step").<Person, Person>chunk(2)
                            .reader(itemReader())
                            .writer(itemWriter())
                            .build())
                    .build();
        }

        @Bean
        public JobLauncherTestUtils jobLauncherTestUtils() {
            return new JobLauncherTestUtils();
        }
    }

    public static class UpdateItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public UpdateItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("foo".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
                }
            }
        }
    }

    public static class InsertItemWriter implements ItemWriter<Person> {

        private JdbcTemplate jdbcTemplate;

        public InsertItemWriter(DataSource dataSource) {
            this.jdbcTemplate = new JdbcTemplate(dataSource);
        }

        @Override
        public void write(List<? extends Person> items) {
            for (Person person : items) {
                if ("bar".equalsIgnoreCase(person.getName())) {
                    jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
                    throw new IllegalStateException("Something went wrong!");
                }
            }
        }
    }

    public static class Person {

        private long id;

        private String name;

        public Person() {
        }

        public Person(long id, String name) {
            this.id = id;
            this.name = name;
        }

        public long getId() {
            return id;
        }

        public void setId(long id) {
            this.id = id;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }
    }
}

我的示例使用自定义项目编写器,但这也适用于两个 JdbcBatchItemWriter

希望对您有所帮助!