Spring Batch 是否具有跟踪已处理行的功能?

Does Spring Batch have a feature to keep track of processed rows?

我目前使用 Spring 批处理通过外部应用程序更新我的实体,该应用程序通过 ODBC 连接直接写入我的数据库。

为了简单起见,我将在此处表示 table 架构的简化版本(在下面的 Java 代码中称为 `importshipmentdata):

id(integer)
entity1_data1(character varying)
entity1_data2(character varying)
entity2_data2(character varying)
import_date(date_created timestamp with time zone)

这是我的工作配置:

@Configuration
@EnableBatchProcessing
public class ImportShippingConfig {

    @Inject
    private JobBuilderFactory jobs;

    @Inject
    private StepBuilderFactory steps;

    @Inject
    private JobRepository jobRepository;

    @Inject
    private DataSource dataSource;


    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher;
        jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return jobLauncher;
    }

    @Bean
    public ItemReader<ImportShippingItem> reader() {
        JdbcCursorItemReader<ImportShippingItem> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        BeanPropertyRowMapper<ImportShippingItem> mapper = new BeanPropertyRowMapper<>(ImportShippingItem.class);
        reader.setSql("SELECT * FROM importshipmentdata");
        reader.setRowMapper(mapper);
        return reader;
    }

    @Bean
    public ItemProcessor<ImportShippingItem, ImportShippingItem> processor() {
        return new ImportShippingItemProcessor();
    }

    @Bean
    public ItemWriter<ImportShippingItem> writer() {
        return new ImportShippingItemWriter();
    }

    @Bean
    public Job ShippingImporter() {
        return jobs.get("ShippingImporter").start(importShipping()).build();
    }

    @Bean
    public Step importShipping() {
        return steps.get("importShipping")
            .<ImportShippingItem, ImportShippingItem>chunk(5)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
    }

}

(不要被 SELECT * 冒犯,我还处于 POC 阶段 :))

我可以使用 import_date 归档,但话又说回来,恐怕它不能很好地处理作业失败。

我在编辑问题时找到了答案,所以我会 post 在这里。

documentation

中所述

By default, all of the ItemReader and ItemWriter implementations store their current state in the ExecutionContext before it is committed. However, this may not always be the desired behavior. For example, many developers choose to make their database readers 'rerunnable' by using a process indicator. An extra column is added to the input data to indicate whether or not it has been processed. When a particular record is being read (or written out) the processed flag is flipped from false to true. The SQL statement can then contain an extra statement in the where clause, such as "where PROCESSED_IND = false", thereby ensuring that only unprocessed records will be returned in the case of a restart. In this scenario, it is preferable to not store any state, such as the current row number, since it will be irrelevant upon restart. For this reason, all readers and writers include the 'saveState' property:

所以我将在 table 中添加一个 processed 列并将查询更改为 SELECT entity1_data1, entity1_data2, entity2_data1 FROM table WHERE processed = false

然后我的writer会在写item的时候把column的值改成true然后设置成saveState=false

您可以使用 JobRepository 获取状态和 jobId,此 repo 提供 spring batch

上的所有数据