Spring 批处理项目 Reader 只执行一次
Spring Batch Item Reader is executing only once
正在尝试执行Spring批处理,但遇到一个奇怪的问题,我们的ItemReader
class只执行了一次。
详情如下。
- If we have 1000 rows in DB.
- Our Item reader fetch 1000 rows from DB,and pass list to
ItemWriter
ItemWriter
successfully delete all items.
- Now ItemReader again tries to fetch the data from DB,but did not find,hence returns NULL,so execution stops.
- But we have configured batch to be executed with
Quartz
scheduler,which is every minute.
- Now if we insert let say 1000 rows in DB by dump import,the batch job should pick this data in next execution,but it is not even executing,although
JobLauncher
is executing.
配置:-
1.We 具有提交间隔等于 1 的 ItemReader、ItemWriter。
<batch:job id="csrfTokenBatchJob">
<batch:step id="step1">
<tasklet>
<chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></chunk>
</tasklet>
</batch:step>
</batch:job>
2.Job 计划每分钟触发一次。
<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
<property name="jobDetail" ref="jobDetail" />
<property name="cronExpression" value="0 0/1 * * * ?" />
</bean>
</property>
</bean>
3.Job配置
<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
<property name="jobClass" value="com.tavant.oauth.batch.job.CSRFTokenJobLauncher" />
<property name="jobDataAsMap">
<map>
<entry key="jobName" value="csrfTokenCleanUpBatchJob" />
<entry key="jobLocator" value-ref="jobRegistry" />
<entry key="jobLauncher" value-ref="jobLauncher" />
</map>
</property>
</bean>
第一次执行成功,后来没有执行,但是我在日志中看到JobLauncher
正在执行。
@Component("csrfTokenReader")
@Scope(value="step")
public class CSRFTokenReader implements ItemReader<List<CSRFToken>> {
private static final Logger logger = LoggerFactory.getLogger(CSRFTokenReader.class);
@Autowired
private CleanService cleanService;
@Override
public List<CSRFToken> read() {
List<CSRFToken> csrfTokenList = null;
try{
int keepUpto = Integer.valueOf(PropertiesContext.getInstance().getProperties().getProperty("token.keep", "1"));
Calendar calTime = Calendar.getInstance();
calTime.add(Calendar.HOUR, -keepUpto);
Date toKeep = calTime.getTime();
csrfTokenList = cleanService.getCSRFTokenByTime(toKeep);
}
catch(Throwable th){
logger.error("Exception in running job At " + new Date() + th);
}
if(CollectionUtils.isEmpty(csrfTokenList)){
return null;
}
return csrfTokenList;
}
}
编辑:--
public class CSRFTokenJobLauncher extends QuartzJobBean {
static final String JOB_NAME = "jobName";
private JobLocator jobLocator;
private JobLauncher jobLauncher;
public void setJobLocator(JobLocator jobLocator) {
this.jobLocator = jobLocator;
}
public void setJobLauncher(JobLauncher jobLauncher) {
this.jobLauncher = jobLauncher;
}
@Override
protected void executeInternal(JobExecutionContext context) {
Map<String, Object> jobDataMap = context.getMergedJobDataMap();
String jobName = (String) jobDataMap.get(JOB_NAME);
log.info("Quartz trigger firing with Spring Batch jobName="+jobName);
JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
try {
jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
}
catch (JobExecutionException e) {
log.error("Could not execute job.", e);
}
}
private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
JobParametersBuilder builder = new JobParametersBuilder();
for (Entry<String, Object> entry : jobDataMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String && !key.equals(JOB_NAME)) {
builder.addString(key, (String) value);
}
else if (value instanceof Float || value instanceof Double) {
builder.addDouble(key, ((Number) value).doubleValue());
}
else if (value instanceof Integer || value instanceof Long) {
builder.addLong(key, ((Number)value).longValue());
}
else if (value instanceof Date) {
builder.addDate(key, (Date) value);
}
}
return builder.toJobParameters();
}
}
经过几个小时的浪费,问题现在似乎已经解决了,我已经在 tasklet.Now 中配置了 allow-start-if-complete="true"
批处理项目 Reader 正在按计划执行。
<batch:job id="csrfTokenBatchJob">
<batch:step id="step1">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
Spring 批量记录数据库中的每个作业执行。这就是为什么 spring 批处理需要区分每个作业 运行。它会检查该作业是否已在同一天执行,并且不会再次启动,除非任何作业参数与之前的 运行 不同,或者如果启用了完整设置则允许启动。
选项 1:- 如上所述,我们可以使用 allow-start-if-complete="true"
选项 2:- 始终传递作为当前日期时间戳的作业参数。这样作业参数值总是唯一的。
JobExecution jobExecution = jobLauncher.run(reportJob, new JobParametersBuilder()
.addDate("now", new Date()).build());
选项 3:- 使用增量器,例如 RunIdIncrementer,这样我们就不需要确保每次都传递唯一的作业参数。
@Bean
public Job job1(JobBuilderFactory jobs, Step s1) {
return jobs.get("job1")
.incrementer(new RunIdIncrementer())
.flow(s1)
.end()
.build();
}
正在尝试执行Spring批处理,但遇到一个奇怪的问题,我们的ItemReader
class只执行了一次。
详情如下。
- If we have 1000 rows in DB.
- Our Item reader fetch 1000 rows from DB,and pass list to
ItemWriter
ItemWriter
successfully delete all items.- Now ItemReader again tries to fetch the data from DB,but did not find,hence returns NULL,so execution stops.
- But we have configured batch to be executed with
Quartz
scheduler,which is every minute.- Now if we insert let say 1000 rows in DB by dump import,the batch job should pick this data in next execution,but it is not even executing,although
JobLauncher
is executing.
配置:-
1.We 具有提交间隔等于 1 的 ItemReader、ItemWriter。
<batch:job id="csrfTokenBatchJob">
<batch:step id="step1">
<tasklet>
<chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></chunk>
</tasklet>
</batch:step>
</batch:job>
2.Job 计划每分钟触发一次。
<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<bean id="cronTrigger" class="org.springframework.scheduling.quartz.CronTriggerBean">
<property name="jobDetail" ref="jobDetail" />
<property name="cronExpression" value="0 0/1 * * * ?" />
</bean>
</property>
</bean>
3.Job配置
<bean id="jobDetail" class="org.springframework.scheduling.quartz.JobDetailBean">
<property name="jobClass" value="com.tavant.oauth.batch.job.CSRFTokenJobLauncher" />
<property name="jobDataAsMap">
<map>
<entry key="jobName" value="csrfTokenCleanUpBatchJob" />
<entry key="jobLocator" value-ref="jobRegistry" />
<entry key="jobLauncher" value-ref="jobLauncher" />
</map>
</property>
</bean>
第一次执行成功,后来没有执行,但是我在日志中看到JobLauncher
正在执行。
@Component("csrfTokenReader")
@Scope(value="step")
public class CSRFTokenReader implements ItemReader<List<CSRFToken>> {
private static final Logger logger = LoggerFactory.getLogger(CSRFTokenReader.class);
@Autowired
private CleanService cleanService;
@Override
public List<CSRFToken> read() {
List<CSRFToken> csrfTokenList = null;
try{
int keepUpto = Integer.valueOf(PropertiesContext.getInstance().getProperties().getProperty("token.keep", "1"));
Calendar calTime = Calendar.getInstance();
calTime.add(Calendar.HOUR, -keepUpto);
Date toKeep = calTime.getTime();
csrfTokenList = cleanService.getCSRFTokenByTime(toKeep);
}
catch(Throwable th){
logger.error("Exception in running job At " + new Date() + th);
}
if(CollectionUtils.isEmpty(csrfTokenList)){
return null;
}
return csrfTokenList;
}
}
编辑:--
public class CSRFTokenJobLauncher extends QuartzJobBean {
static final String JOB_NAME = "jobName";
private JobLocator jobLocator;
private JobLauncher jobLauncher;
public void setJobLocator(JobLocator jobLocator) {
this.jobLocator = jobLocator;
}
public void setJobLauncher(JobLauncher jobLauncher) {
this.jobLauncher = jobLauncher;
}
@Override
protected void executeInternal(JobExecutionContext context) {
Map<String, Object> jobDataMap = context.getMergedJobDataMap();
String jobName = (String) jobDataMap.get(JOB_NAME);
log.info("Quartz trigger firing with Spring Batch jobName="+jobName);
JobParameters jobParameters = getJobParametersFromJobMap(jobDataMap);
try {
jobLauncher.run(jobLocator.getJob(jobName), jobParameters);
}
catch (JobExecutionException e) {
log.error("Could not execute job.", e);
}
}
private JobParameters getJobParametersFromJobMap(Map<String, Object> jobDataMap) {
JobParametersBuilder builder = new JobParametersBuilder();
for (Entry<String, Object> entry : jobDataMap.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String && !key.equals(JOB_NAME)) {
builder.addString(key, (String) value);
}
else if (value instanceof Float || value instanceof Double) {
builder.addDouble(key, ((Number) value).doubleValue());
}
else if (value instanceof Integer || value instanceof Long) {
builder.addLong(key, ((Number)value).longValue());
}
else if (value instanceof Date) {
builder.addDate(key, (Date) value);
}
}
return builder.toJobParameters();
}
}
经过几个小时的浪费,问题现在似乎已经解决了,我已经在 tasklet.Now 中配置了 allow-start-if-complete="true"
批处理项目 Reader 正在按计划执行。
<batch:job id="csrfTokenBatchJob">
<batch:step id="step1">
<batch:tasklet allow-start-if-complete="true">
<batch:chunk reader="csrfTokenReader" writer="csrfTokenWriter" commit-interval="1"></batch:chunk>
</batch:tasklet>
</batch:step>
</batch:job>
Spring 批量记录数据库中的每个作业执行。这就是为什么 spring 批处理需要区分每个作业 运行。它会检查该作业是否已在同一天执行,并且不会再次启动,除非任何作业参数与之前的 运行 不同,或者如果启用了完整设置则允许启动。
选项 1:- 如上所述,我们可以使用 allow-start-if-complete="true"
选项 2:- 始终传递作为当前日期时间戳的作业参数。这样作业参数值总是唯一的。
JobExecution jobExecution = jobLauncher.run(reportJob, new JobParametersBuilder()
.addDate("now", new Date()).build());
选项 3:- 使用增量器,例如 RunIdIncrementer,这样我们就不需要确保每次都传递唯一的作业参数。
@Bean
public Job job1(JobBuilderFactory jobs, Step s1) {
return jobs.get("job1")
.incrementer(new RunIdIncrementer())
.flow(s1)
.end()
.build();
}