Spring StepExecutionListener 中的批量交易
Spring Batch Transactions in StepExecutionListener
我有一个 spring 批处理作业,它从 Web 服务读取数据,在处理器中进行一些丰富,然后保存到数据库。如果有人为同一组参数运行同一个作业两次,我想删除数据库中的旧数据,然后作为该作业的一部分重新写入。
删除逻辑我已经写在StepExecutionListener Before Step Method中了
如何使我的步骤具有事务性,以便在作业中出现错误时回滚删除操作?
this.stepBuilderFactory.get("xStep")
.<Item,Item>chunk(1000)
.reader(xReader)
.processor(xProcessor)
.writer(xWriter)
.listener(xStepExecutionListenerForDelete)
.build()
How can I make my step transactional so that if there is an error in the job the delete operation is rolledback?
您可以将删除逻辑编写为项目编写器的一部分,它在 Spring 批处理驱动的事务中调用。如果事务因任何原因被回滚,你的删除操作将被回滚。请注意,项目写入器不仅用于插入数据,还可以用于更新数据和删除数据(以MongoItemWriter#setDelete为例)。
作业参数
如果已经存在具有相同“标识”作业参数的实例,作业将不会开始。
作业将关闭,但出现以下异常:
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException
您可以创建一个随机生成的值或日期的作业参数。在您的侦听器中,您可以验证前一个作业实例的作业参数,不包括您的“唯一”作业参数。
事务监听器
将@Transactional 注释添加到侦听器以将其包装在事务中。
例子
@Transactional
public class DataErasureListener implements StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Override
public void beforeStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
Map<String, JobParameter> currentJobParameters = stepExecution.getJobParameters().getParameters();
int jobInstanceCount = jobExplorer.getJobInstanceCount(jobName);
if (jobInstanceCount == 1) {
//No prior run
return;
}
//The list of JobInstances are in descending order of creation. Grab the 2nd one.
JobInstance priorJobInstance = jobExplorer.getJobInstances(jobName, 0, jobInstanceCount).get(1);
JobExecution priorJobExecution= jobExplorer.getLastJobExecution(priorJobInstance);
Map<String, JobParameter> priorJobParameters = priorJobExecution.getJobParameters().getParameters();
//Compare prior job parameters excluding "unique" job parameters
currentJobParameters.remove("unique");
priorJobParameters.remove("unique");
if (currentJobParameters.equals(priorJobParameters)) {
//Delete old data
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
}
我有一个 spring 批处理作业,它从 Web 服务读取数据,在处理器中进行一些丰富,然后保存到数据库。如果有人为同一组参数运行同一个作业两次,我想删除数据库中的旧数据,然后作为该作业的一部分重新写入。
删除逻辑我已经写在StepExecutionListener Before Step Method中了
如何使我的步骤具有事务性,以便在作业中出现错误时回滚删除操作?
this.stepBuilderFactory.get("xStep")
.<Item,Item>chunk(1000)
.reader(xReader)
.processor(xProcessor)
.writer(xWriter)
.listener(xStepExecutionListenerForDelete)
.build()
How can I make my step transactional so that if there is an error in the job the delete operation is rolledback?
您可以将删除逻辑编写为项目编写器的一部分,它在 Spring 批处理驱动的事务中调用。如果事务因任何原因被回滚,你的删除操作将被回滚。请注意,项目写入器不仅用于插入数据,还可以用于更新数据和删除数据(以MongoItemWriter#setDelete为例)。
作业参数
如果已经存在具有相同“标识”作业参数的实例,作业将不会开始。
作业将关闭,但出现以下异常:
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException
您可以创建一个随机生成的值或日期的作业参数。在您的侦听器中,您可以验证前一个作业实例的作业参数,不包括您的“唯一”作业参数。
事务监听器
将@Transactional 注释添加到侦听器以将其包装在事务中。
例子
@Transactional
public class DataErasureListener implements StepExecutionListener {
@Autowired
JobExplorer jobExplorer;
@Override
public void beforeStep(StepExecution stepExecution) {
String jobName = stepExecution.getJobExecution().getJobInstance().getJobName();
Map<String, JobParameter> currentJobParameters = stepExecution.getJobParameters().getParameters();
int jobInstanceCount = jobExplorer.getJobInstanceCount(jobName);
if (jobInstanceCount == 1) {
//No prior run
return;
}
//The list of JobInstances are in descending order of creation. Grab the 2nd one.
JobInstance priorJobInstance = jobExplorer.getJobInstances(jobName, 0, jobInstanceCount).get(1);
JobExecution priorJobExecution= jobExplorer.getLastJobExecution(priorJobInstance);
Map<String, JobParameter> priorJobParameters = priorJobExecution.getJobParameters().getParameters();
//Compare prior job parameters excluding "unique" job parameters
currentJobParameters.remove("unique");
priorJobParameters.remove("unique");
if (currentJobParameters.equals(priorJobParameters)) {
//Delete old data
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return stepExecution.getExitStatus();
}
}