Spring Batch 是否具有跟踪已处理行的功能?
Does Spring Batch have a feature to keep track of processed rows?
我目前使用 Spring 批处理通过外部应用程序更新我的实体,该应用程序通过 ODBC 连接直接写入我的数据库。
为了简单起见,我将在此处表示 table 架构的简化版本(在下面的 Java 代码中称为 `importshipmentdata):
id(integer)
entity1_data1(character varying)
entity1_data2(character varying)
entity2_data2(character varying)
import_date(date_created timestamp with time zone)
这是我的工作配置:
@Configuration
@EnableBatchProcessing
public class ImportShippingConfig {
@Inject
private JobBuilderFactory jobs;
@Inject
private StepBuilderFactory steps;
@Inject
private JobRepository jobRepository;
@Inject
private DataSource dataSource;
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher;
jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
return jobLauncher;
}
@Bean
public ItemReader<ImportShippingItem> reader() {
JdbcCursorItemReader<ImportShippingItem> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
BeanPropertyRowMapper<ImportShippingItem> mapper = new BeanPropertyRowMapper<>(ImportShippingItem.class);
reader.setSql("SELECT * FROM importshipmentdata");
reader.setRowMapper(mapper);
return reader;
}
@Bean
public ItemProcessor<ImportShippingItem, ImportShippingItem> processor() {
return new ImportShippingItemProcessor();
}
@Bean
public ItemWriter<ImportShippingItem> writer() {
return new ImportShippingItemWriter();
}
@Bean
public Job ShippingImporter() {
return jobs.get("ShippingImporter").start(importShipping()).build();
}
@Bean
public Step importShipping() {
return steps.get("importShipping")
.<ImportShippingItem, ImportShippingItem>chunk(5)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
(不要被 SELECT *
冒犯,我还处于 POC 阶段 :))
我可以使用 import_date
归档,但话又说回来,恐怕它不能很好地处理作业失败。
我在编辑问题时找到了答案,所以我会 post 在这里。
中所述
By default, all of the ItemReader and ItemWriter implementations store
their current state in the ExecutionContext before it is committed.
However, this may not always be the desired behavior. For example,
many developers choose to make their database readers 'rerunnable' by
using a process indicator. An extra column is added to the input data
to indicate whether or not it has been processed. When a particular
record is being read (or written out) the processed flag is flipped
from false to true. The SQL statement can then contain an extra
statement in the where clause, such as "where PROCESSED_IND = false",
thereby ensuring that only unprocessed records will be returned in the
case of a restart. In this scenario, it is preferable to not store any
state, such as the current row number, since it will be irrelevant
upon restart. For this reason, all readers and writers include the
'saveState' property:
所以我将在 table 中添加一个 processed
列并将查询更改为 SELECT entity1_data1, entity1_data2, entity2_data1 FROM table WHERE processed = false
。
然后我的writer会在写item的时候把column的值改成true
然后设置成saveState=false
您可以使用 JobRepository 获取状态和 jobId,此 repo 提供 spring batch
上的所有数据
我目前使用 Spring 批处理通过外部应用程序更新我的实体,该应用程序通过 ODBC 连接直接写入我的数据库。
为了简单起见,我将在此处表示 table 架构的简化版本(在下面的 Java 代码中称为 `importshipmentdata):
id(integer)
entity1_data1(character varying)
entity1_data2(character varying)
entity2_data2(character varying)
import_date(date_created timestamp with time zone)
这是我的工作配置:
@Configuration
@EnableBatchProcessing
public class ImportShippingConfig {
@Inject
private JobBuilderFactory jobs;
@Inject
private StepBuilderFactory steps;
@Inject
private JobRepository jobRepository;
@Inject
private DataSource dataSource;
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher;
jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
return jobLauncher;
}
@Bean
public ItemReader<ImportShippingItem> reader() {
JdbcCursorItemReader<ImportShippingItem> reader = new JdbcCursorItemReader<>();
reader.setDataSource(dataSource);
BeanPropertyRowMapper<ImportShippingItem> mapper = new BeanPropertyRowMapper<>(ImportShippingItem.class);
reader.setSql("SELECT * FROM importshipmentdata");
reader.setRowMapper(mapper);
return reader;
}
@Bean
public ItemProcessor<ImportShippingItem, ImportShippingItem> processor() {
return new ImportShippingItemProcessor();
}
@Bean
public ItemWriter<ImportShippingItem> writer() {
return new ImportShippingItemWriter();
}
@Bean
public Job ShippingImporter() {
return jobs.get("ShippingImporter").start(importShipping()).build();
}
@Bean
public Step importShipping() {
return steps.get("importShipping")
.<ImportShippingItem, ImportShippingItem>chunk(5)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
(不要被 SELECT *
冒犯,我还处于 POC 阶段 :))
我可以使用 import_date
归档,但话又说回来,恐怕它不能很好地处理作业失败。
我在编辑问题时找到了答案,所以我会 post 在这里。
中所述By default, all of the ItemReader and ItemWriter implementations store their current state in the ExecutionContext before it is committed. However, this may not always be the desired behavior. For example, many developers choose to make their database readers 'rerunnable' by using a process indicator. An extra column is added to the input data to indicate whether or not it has been processed. When a particular record is being read (or written out) the processed flag is flipped from false to true. The SQL statement can then contain an extra statement in the where clause, such as "where PROCESSED_IND = false", thereby ensuring that only unprocessed records will be returned in the case of a restart. In this scenario, it is preferable to not store any state, such as the current row number, since it will be irrelevant upon restart. For this reason, all readers and writers include the 'saveState' property:
所以我将在 table 中添加一个 processed
列并将查询更改为 SELECT entity1_data1, entity1_data2, entity2_data1 FROM table WHERE processed = false
。
然后我的writer会在写item的时候把column的值改成true
然后设置成saveState=false
您可以使用 JobRepository 获取状态和 jobId,此 repo 提供 spring batch
上的所有数据