Spring Batch CompositeItemWriter 如何管理委托编写器的事务?
How does Spring Batch CompositeItemWriter manage transaction for delegate writers?
在batch job step configuration中,我计划在writer中执行2个查询,第一个查询是更新table A中的记录,然后第二个查询是在[=21中插入新记录=]又是A.
至此我认为CompositeItemWriter可以实现我上面的目标,即我需要创建2个JdbcBatchItemWriters,一个用于更新,一个用于插入。
我的第一个问题是 CompositeItemWriter 是否符合上述要求?
如果是,则引出第二个交易问题。例如,如果第一次更新成功,而第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动拉取两个更新?
提前致谢!
My first question is if CompositeItemWriter is a fit for the requirement above?
是的,CompositeItemWriter
是正确的选择。
If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?
问得好!是的,如果第一个写入器更新成功,然后第二个写入器插入失败,所有语句将自动回滚。您需要知道的是事务是围绕面向块的 tasklet 步骤的执行(因此围绕复合项编写器的 write
方法)。因此,此方法中所有 sql 语句的执行(在委托编写器中执行)将是原子的。
为了说明这个用例,我编写了以下测试:
- 给定一个包含两列
id
和 name
的 table people
,其中只有一条记录:1,'foo'
- 让我们想象一个作业读取两条记录 (
1,'foo'
, 2,'bar'
) 并尝试将 foo
更新为 foo!!
然后插入 2,'bar'
table。这是通过 CompositeItemWriter
和两个项目编写器完成的:UpdateItemWriter
和 InsertItemWriter
- 用例是
UpdateItemWriter
成功但 InsertItemWriter
失败(通过抛出异常)
- 预期的结果是
foo
没有更新为 foo!!
并且 bar
没有插入 table (两个 sql 语句都被滚动由于 InsertItemWriter
) 中的异常返回
这是代码(它是独立的,所以你可以试试看它是如何工作的,它使用一个嵌入式 hsqldb 数据库,它应该在你的类路径中):
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void setUp() {
jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
}
@Test
public void testTransactionRollbackWithCompositeWriter() throws Exception {
// given
int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount);
Assert.assertEquals(1, fooCount);
Assert.assertEquals(0, barCount);
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// then
Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
Assert.assertEquals(0, stepExecution.getCommitCount());
Assert.assertEquals(1, stepExecution.getRollbackCount());
Assert.assertEquals(0, stepExecution.getWriteCount());
peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount); // bar is not inserted
Assert.assertEquals(0, barCount); // bar is not inserted
Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<Person> itemReader() {
Person foo = new Person(1, "foo");
Person bar = new Person(2, "bar");
return new ListItemReader<>(Arrays.asList(foo, bar));
}
@Bean
public ItemWriter<Person> updateItemWriter() {
return new UpdateItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> insertItemWriter() {
return new InsertItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> itemWriter() {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
return compositeItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory
.get("step").<Person, Person>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
}
public static class UpdateItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public UpdateItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("foo".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
}
}
}
}
public static class InsertItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public InsertItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("bar".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
throw new IllegalStateException("Something went wrong!");
}
}
}
}
public static class Person {
private long id;
private String name;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
我的示例使用自定义项目编写器,但这也适用于两个 JdbcBatchItemWriter
。
希望对您有所帮助!
在batch job step configuration中,我计划在writer中执行2个查询,第一个查询是更新table A中的记录,然后第二个查询是在[=21中插入新记录=]又是A.
至此我认为CompositeItemWriter可以实现我上面的目标,即我需要创建2个JdbcBatchItemWriters,一个用于更新,一个用于插入。
我的第一个问题是 CompositeItemWriter 是否符合上述要求?
如果是,则引出第二个交易问题。例如,如果第一次更新成功,而第二次插入失败。第一次更新事务会自动回滚吗?否则,如何在同一个事务中手动拉取两个更新?
提前致谢!
My first question is if CompositeItemWriter is a fit for the requirement above?
是的,CompositeItemWriter
是正确的选择。
If yes, that lead to the second question about transaction. For example, if the first update is successful, and the second insert fails. Will the 1st update transaction be rolled back automatically? Otherwise, how to manually pull both updates in the same transaction?
问得好!是的,如果第一个写入器更新成功,然后第二个写入器插入失败,所有语句将自动回滚。您需要知道的是事务是围绕面向块的 tasklet 步骤的执行(因此围绕复合项编写器的 write
方法)。因此,此方法中所有 sql 语句的执行(在委托编写器中执行)将是原子的。
为了说明这个用例,我编写了以下测试:
- 给定一个包含两列
id
和name
的 tablepeople
,其中只有一条记录:1,'foo'
- 让我们想象一个作业读取两条记录 (
1,'foo'
,2,'bar'
) 并尝试将foo
更新为foo!!
然后插入2,'bar'
table。这是通过CompositeItemWriter
和两个项目编写器完成的:UpdateItemWriter
和InsertItemWriter
- 用例是
UpdateItemWriter
成功但InsertItemWriter
失败(通过抛出异常) - 预期的结果是
foo
没有更新为foo!!
并且bar
没有插入 table (两个 sql 语句都被滚动由于InsertItemWriter
) 中的异常返回
这是代码(它是独立的,所以你可以试试看它是如何工作的,它使用一个嵌入式 hsqldb 数据库,它应该在你的类路径中):
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.jdbc.JdbcTestUtils;
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = TransactionWithCompositeWriterTest.JobConfiguration.class)
public class TransactionWithCompositeWriterTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void setUp() {
jdbcTemplate.update("CREATE TABLE people (id INT IDENTITY NOT NULL PRIMARY KEY, name VARCHAR(20));");
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (1, 'foo');");
}
@Test
public void testTransactionRollbackWithCompositeWriter() throws Exception {
// given
int peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
int fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
int barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount);
Assert.assertEquals(1, fooCount);
Assert.assertEquals(0, barCount);
// when
JobExecution jobExecution = jobLauncherTestUtils.launchJob();
// then
Assert.assertEquals(ExitStatus.FAILED.getExitCode(), jobExecution.getExitStatus().getExitCode());
Assert.assertEquals("Something went wrong!", jobExecution.getAllFailureExceptions().get(0).getMessage());
StepExecution stepExecution = jobExecution.getStepExecutions().iterator().next();
Assert.assertEquals(0, stepExecution.getCommitCount());
Assert.assertEquals(1, stepExecution.getRollbackCount());
Assert.assertEquals(0, stepExecution.getWriteCount());
peopleCount = JdbcTestUtils.countRowsInTable(jdbcTemplate, "people");
fooCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 1 and name = 'foo'");
barCount = JdbcTestUtils.countRowsInTableWhere(jdbcTemplate, "people", "id = 2 and name = 'bar'");
Assert.assertEquals(1, peopleCount); // bar is not inserted
Assert.assertEquals(0, barCount); // bar is not inserted
Assert.assertEquals(1, fooCount); // foo is not updated to "foo!!"
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("/org/springframework/batch/core/schema-drop-hsqldb.sql")
.addScript("/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
@Bean
public ItemReader<Person> itemReader() {
Person foo = new Person(1, "foo");
Person bar = new Person(2, "bar");
return new ListItemReader<>(Arrays.asList(foo, bar));
}
@Bean
public ItemWriter<Person> updateItemWriter() {
return new UpdateItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> insertItemWriter() {
return new InsertItemWriter(dataSource());
}
@Bean
public ItemWriter<Person> itemWriter() {
CompositeItemWriter<Person> compositeItemWriter = new CompositeItemWriter<>();
compositeItemWriter.setDelegates(Arrays.asList(updateItemWriter(), insertItemWriter()));
return compositeItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory
.get("step").<Person, Person>chunk(2)
.reader(itemReader())
.writer(itemWriter())
.build())
.build();
}
@Bean
public JobLauncherTestUtils jobLauncherTestUtils() {
return new JobLauncherTestUtils();
}
}
public static class UpdateItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public UpdateItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("foo".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("UPDATE people SET name = 'foo!!' WHERE id = 1");
}
}
}
}
public static class InsertItemWriter implements ItemWriter<Person> {
private JdbcTemplate jdbcTemplate;
public InsertItemWriter(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
public void write(List<? extends Person> items) {
for (Person person : items) {
if ("bar".equalsIgnoreCase(person.getName())) {
jdbcTemplate.update("INSERT INTO people (id, name) VALUES (?, ?)", person.getId(), person.getName());
throw new IllegalStateException("Something went wrong!");
}
}
}
}
public static class Person {
private long id;
private String name;
public Person() {
}
public Person(long id, String name) {
this.id = id;
this.name = name;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
我的示例使用自定义项目编写器,但这也适用于两个 JdbcBatchItemWriter
。
希望对您有所帮助!