Spring 批处理 - 尝试 read/write 到相同 table 时出现 DeadlockLoserDataAccessException
Spring Batch - Getting a DeadlockLoserDataAccessException when trying to read/write to the same table
我正在开发一个 Spring 批处理应用程序,它将从 Table A 读取未处理的数据,处理数据,将处理后的数据插入 Table B,然后更新Table A 中的行已处理。但是,虽然将数据插入 Table B 工作正常,但每次我尝试更新 Table A 时,我都会收到 DeadlockLoserDataAccessException。我相信这是由于来自 JDBCCursorItemReader 的 Curser 用于读取Table A 正在妨碍更新 Table。我将如何解决这个问题?
我在 Spring 批处理中使用 JDBCCursorItemReader 和 CompositeItemWriter。块大小为 1.
架构是 ETL,就像从源读取数据、处理数据并将其写入目标一样。我尽量避免在我的流程中使用这种更新逻辑,因为它会带来很大的开销和您描述的问题。所以也许你可以重新考虑架构......
如果不是,我真的建议为更新设置一个合适的索引——这取决于您使用的搜索条件。这将使更新不仅更便宜,而且 SQL 只需要访问一行 - 避免额外的 table 扫描更新。
我建议您将事务逻辑重新设计为 "lock" 必要的 TABLEA 行,在事务开始时将它们标记为 'PROCESSED',并且在事务结束时不再更新它们交易。
请参阅下面的示例。
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
如果读取、插入和更新都在同一个事务中(当您使用面向块的步骤时就是这种情况),这应该不会造成问题。
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
这是一个快速(独立)示例,其配置与您提到的相同:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
它从 Person
table(你的表 A)中读取人,将他们的名字大写,并将结果写入 People
table(你的表 B案件)。然后它更新 Person
table.
上的 processed
标志
如果您 运行 样本,您应该看到:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
没有任何死锁异常。
希望对您有所帮助。
我正在开发一个 Spring 批处理应用程序,它将从 Table A 读取未处理的数据,处理数据,将处理后的数据插入 Table B,然后更新Table A 中的行已处理。但是,虽然将数据插入 Table B 工作正常,但每次我尝试更新 Table A 时,我都会收到 DeadlockLoserDataAccessException。我相信这是由于来自 JDBCCursorItemReader 的 Curser 用于读取Table A 正在妨碍更新 Table。我将如何解决这个问题?
我在 Spring 批处理中使用 JDBCCursorItemReader 和 CompositeItemWriter。块大小为 1.
架构是 ETL,就像从源读取数据、处理数据并将其写入目标一样。我尽量避免在我的流程中使用这种更新逻辑,因为它会带来很大的开销和您描述的问题。所以也许你可以重新考虑架构......
如果不是,我真的建议为更新设置一个合适的索引——这取决于您使用的搜索条件。这将使更新不仅更便宜,而且 SQL 只需要访问一行 - 避免额外的 table 扫描更新。
我建议您将事务逻辑重新设计为 "lock" 必要的 TABLEA 行,在事务开始时将它们标记为 'PROCESSED',并且在事务结束时不再更新它们交易。 请参阅下面的示例。
-- *** Example of queue processing in DB2 ***
-- The following registry variables must be set:
-- DB2_SKIPINSERTED=YES
-- DB2_SKIPDELETED=YES
-- DB2_EVALUNCOMMITTED=YES
-- Don't forget to db2stop/db2start after their setting to make the changes take an effect.
create table test(id int not null, not_processed dec(1) default 1, constraint np check (not_processed=1));
-- 'exclude null keys' is avaiable starting from V10.5
create index test1 on test(not_processed) exclude null keys;
alter table test volatile; -- easy way to force ixscan discregarding the table statistics collected
insert into test (id) values 1,2;
-- Every session starts its transaction with locking its own set of rows (only one in the example),
-- which becomes invisible for the same statement issued by other concurrent transactions
-- due to setting registry variables above.
-- No lock waits expected on such an update.
update (select not_processed from test where not_processed=1 fetch first 1 row only) set not_processed=null;
-- work with other tables comes below
-- ...
-- transaction end
I believe this is due to the Curser from the JDBCCursorItemReader that was used to read Table A is getting in the way of Updating the Table. How would I go about fixing this?
如果读取、插入和更新都在同一个事务中(当您使用面向块的步骤时就是这种情况),这应该不会造成问题。
I use a JDBCCursorItemReader and CompositeItemWriter in the Spring Batch. The chunk size is 1.
这是一个快速(独立)示例,其配置与您提到的相同:
import java.util.Arrays;
import javax.sql.DataSource;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.builder.CompositeItemWriterBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;
@Configuration
@EnableBatchProcessing
public class MyJob {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public JdbcCursorItemReader<Person> itemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.name("personItemReader")
.dataSource(dataSource())
.sql("select id, name from person where processed = false")
.beanRowMapper(Person.class)
.saveState(false) // process indicator pattern, no need to save state (see https://docs.spring.io/spring-batch/4.1.x/reference/html/readersAndWriters.html#process-indicator)
.build();
}
@Bean
public ItemProcessor<Person, Person> itemProcessor() {
return item -> new Person(item.getId(), item.getName().toUpperCase());
}
@Bean
public CompositeItemWriter<Person> itemWriter() {
return new CompositeItemWriterBuilder<Person>()
.delegates(Arrays.asList(peopleItemWriter(), personItemUpdater()))
.ignoreItemStream(true)
.build();
}
@Bean
public JdbcBatchItemWriter<Person> peopleItemWriter() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("insert into people (name) values (:name)")
.build();
}
@Bean
public JdbcBatchItemWriter<Person> personItemUpdater() {
return new JdbcBatchItemWriterBuilder<Person>()
.dataSource(dataSource())
.beanMapped()
.sql("update person set processed = true where id = :id")
.build();
}
@Bean
public Step step() {
return steps.get("step")
.<Person, Person>chunk(1)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(step())
.build();
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.H2)
.addScript("/org/springframework/batch/core/schema-drop-h2.sql")
.addScript("/org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public JdbcTemplate jdbcTemplate(DataSource dataSource) {
return new JdbcTemplate(dataSource);
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JdbcTemplate jdbcTemplate = context.getBean(JdbcTemplate.class);
jdbcTemplate.execute("CREATE TABLE person (id int IDENTITY PRIMARY KEY, name VARCHAR(10), processed boolean);");
jdbcTemplate.execute("CREATE TABLE people (id int IDENTITY PRIMARY KEY, name VARCHAR(10));");
jdbcTemplate.execute("insert into person (id, name, processed) values (1, 'foo', false);");
jdbcTemplate.execute("insert into person (id, name, processed) values (2, 'bar', false);");
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
Integer nbInsertedFoos = jdbcTemplate.queryForObject("select count(id) from people where name = 'FOO'", Integer.class);
Integer nbInsertedBars = jdbcTemplate.queryForObject("select count(id) from people where name = 'BAR'", Integer.class);
System.out.println("nbInsertedFoos in people table = " + nbInsertedFoos);
System.out.println("nbInsertedBars in people table = " + nbInsertedBars);
Integer nbUpdatedPersons = jdbcTemplate.queryForObject("select count(*) from person where processed = true", Integer.class);
System.out.println("nbUpdatedPersons in person table = " + nbUpdatedPersons);
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
它从 Person
table(你的表 A)中读取人,将他们的名字大写,并将结果写入 People
table(你的表 B案件)。然后它更新 Person
table.
processed
标志
如果您 运行 样本,您应该看到:
nbInsertedFoos in people table = 1
nbInsertedBars in people table = 1
nbUpdatedPersons in person table = 2
没有任何死锁异常。
希望对您有所帮助。