Spring StepExecutionListener 中的批量交易

Spring Batch Transactions in StepExecutionListener

我有一个 spring 批处理作业,它从 Web 服务读取数据,在处理器中进行一些丰富,然后保存到数据库。如果有人为同一组参数运行同一个作业两次,我想删除数据库中的旧数据,然后作为该作业的一部分重新写入。

删除逻辑我已经写在StepExecutionListener Before Step Method中了

如何使我的步骤具有事务性,以便在作业中出现错误时回滚删除操作?

this.stepBuilderFactory.get("xStep")
.<Item,Item>chunk(1000)
.reader(xReader)
.processor(xProcessor)
.writer(xWriter)
.listener(xStepExecutionListenerForDelete)
.build()

How can I make my step transactional so that if there is an error in the job the delete operation is rolledback?

您可以将删除逻辑编写为项目编写器的一部分,它在 Spring 批处理驱动的事务中调用。如果事务因任何原因被回滚,你的删除操作将被回滚。请注意,项目写入器不仅用于插入数据,还可以用于更新数据和删除数据(以MongoItemWriter#setDelete为例)。

作业参数

如果已经存在具有相同“标识”作业参数的实例,作业将不会开始。

作业将关闭,但出现以下异常: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException

您可以创建一个随机生成的值或日期的作业参数。在您的侦听器中,您可以验证前一个作业实例的作业参数,不包括您的“唯一”作业参数。

事务监听器

将@Transactional 注释添加到侦听器以将其包装在事务中。

例子

@Transactional
public class DataErasureListener implements StepExecutionListener {

    @Autowired
    JobExplorer jobExplorer;

    @Override
    public void beforeStep(StepExecution stepExecution) {

        String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
        Map<String, JobParameter> currentJobParameters = stepExecution.getJobParameters().getParameters();

        int jobInstanceCount = jobExplorer.getJobInstanceCount(jobName);

        if (jobInstanceCount == 1) {
            //No prior run
            return;
        }

        //The list of JobInstances are in descending order of creation. Grab the 2nd one.
        JobInstance priorJobInstance = jobExplorer.getJobInstances(jobName, 0, jobInstanceCount).get(1);
        JobExecution priorJobExecution= jobExplorer.getLastJobExecution(priorJobInstance);
        Map<String, JobParameter> priorJobParameters = priorJobExecution.getJobParameters().getParameters();

        //Compare prior job parameters excluding "unique" job parameters
        currentJobParameters.remove("unique");
        priorJobParameters.remove("unique");

        if (currentJobParameters.equals(priorJobParameters)) {
            //Delete old data
        }
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        return stepExecution.getExitStatus();
    }
}