Spring 批处理计划程序:作业侦听器仅在作业第一次运行时起作用

Spring Batch Scheduler : Job Listener works only for the first time the job runs

我正在开发一个 Spring 批处理应用程序以使用 java 配置执行两个批处理作业。最近我添加了一个 Spring 调度程序来安排我编写的作业之一。侦听器在作业第一次完成时被调用,但在下一次执行后不会被调用。以下是我的作业配置代码:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration{

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public MongoTemplate mongoTemplate;

    @Autowired
    UnitsRepository unitsRepos;

    @Autowired
    UserRepository userRepository;

    @Autowired
    ElectraService electraService;

    /*@Autowired InfrastructureConfiguration infrastructureConfiguration;*/

    // tag::readerwriterprocessor[]
    @Bean
    @StepScope
    public MongoItemReader<UserBean> reader() {
        MongoItemReader<UserBean> reader = new MongoItemReader<UserBean>();
        reader.setTemplate(mongoTemplate);
        reader.setCollection("user");
        reader.setQuery("{ '_id': 'U3'}");
        reader.setSort(new HashMap<String,Direction>(){{put("_id", Direction.ASC);}});
        reader.setTargetType(UserBean.class);
        return reader;
    }

    @Bean
    public ExceedUsageProcessor processor() {
        return new ExceedUsageProcessor(unitsRepos,electraService);
    }

    @Bean
    public AnomalyProcessor anomalyProcessor() {
        return new AnomalyProcessor(unitsRepos);
    }
    @Bean
    @StepScope
    public MongoItemWriter<DayByDayUsage> writer() {
        MongoItemWriter<DayByDayUsage> writer = new MongoItemWriter<DayByDayUsage>();
        writer.setTemplate(mongoTemplate);
        writer.setCollection("usage");
        return writer;
    }
    // end::readerwriterprocessor[]

    // tag::listener[]

    @Bean
    @StepScope
    public MongoItemWriter<AnomalyBean> anomalyWriter() {
        MongoItemWriter<AnomalyBean> writer = new MongoItemWriter<AnomalyBean>();
        writer.setTemplate(mongoTemplate);
        writer.setCollection("anomaly");
        return writer;
    }

    @Bean
    public ExceedJobNotificationListener listener() {
        return new ExceedJobNotificationListener(mongoTemplate);
    }

    @Bean
    public AnomalyJobListener anomalyListener(){
        return new AnomalyJobListener(mongoTemplate,userRepository);

    }
    // end::listener[]

    // tag::jobstep[]
    @Bean
    public Job notifyUserJob() {
        return jobBuilderFactory.get("notifyUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Job anomalyJob() {
        return jobBuilderFactory.get("anomalyJob")
                .incrementer(new RunIdIncrementer())
                .listener(anomalyListener())
                .flow(step2())
                .end()
                .build();
    }
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<UserBean, DayByDayUsage> chunk(50)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .allowStartIfComplete(true)
                .build();
    }
    // end::jobstep[]

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<UserBean, AnomalyBean> chunk(50)
                .reader(reader())
                .processor(anomalyProcessor())
                .writer(anomalyWriter())
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        // TODO Auto-generated method stub
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.HSQL)
                .addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
                .build();
    }
}

下面是我的调度程序的代码:

@Component
public class AnomalyScheduler {
     private Job myImportJob;
        private JobLauncher jobLauncher;

        @Autowired
        public AnomalyScheduler(JobLauncher jobLauncher, @Qualifier("anomalyJob") Job myImportJob){
            this.myImportJob = myImportJob; 
            this.jobLauncher = jobLauncher;
       }

       @Scheduled(fixedDelay=60000)
       public void runJob(){
           try {
            jobLauncher.run(myImportJob, new JobParameters());
        } catch (JobExecutionAlreadyRunningException | JobRestartException
                | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
       }
}

我的听众如下:

    public class AnomalyJobListener extends JobExecutionListenerSupport {
    private PushNotification pushNotification = PushNotification
            .getPushNotificationInstance();

    @Autowired
    public AnomalyJobListener(MongoTemplate mongoTemplate,
            UserRepository userRepository) {
        List<AnomalyBean> anomalies = new ArrayList<AnomalyBean>(0);
        anomalies = mongoTemplate.findAll(AnomalyBean.class);
        int numAnomalies = anomalies.size();
        List<UserBean> admins = new ArrayList<UserBean>(0);
        admins = userRepository.userByType("admin");
        if (numAnomalies > 0) {
            for (UserBean admin : admins) {
                pushNotification.pushNotification(numAnomalies
                        + " anomalies detected ! Keep an eye on that.",
                        admin.getDeveiceId());
            }

        }
    }
}

这是控制台输出:

2016-05-04 08:17:39.565  INFO 9348 --- [           main] com.electra.Application                  : Starting Application on all-PC with PID 9348 (F:\Electrck\ElectrackJobRepository\ElectrackJobs\bin started by all in F:\Electrck\ElectrackJobRepository\ElectrackJobs)
    2016-05-04 08:17:39.571  INFO 9348 --- [           main] com.electra.Application                  : No active profile set, falling back to default profiles: default
    2016-05-04 08:17:39.681  INFO 9348 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@20b2475a: startup date [Wed May 04 08:17:39 IST 2016]; root of context hierarchy
    2016-05-04 08:17:41.943  WARN 9348 --- [           main] o.s.c.a.ConfigurationClassEnhancer       : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
    2016-05-04 08:17:41.966  WARN 9348 --- [           main] o.s.c.a.ConfigurationClassEnhancer       : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
    2016-05-04 08:17:42.258  INFO 9348 --- [           main] o.s.j.d.e.EmbeddedDatabaseFactory        : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa'
    2016-05-04 08:17:42.646  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
    2016-05-04 08:17:42.658  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 12 ms.
    2016-05-04 08:18:01.793  INFO 9348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 
    2016-05-04 08:18:01.812  INFO 9348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService  'taskExecutor'
    push status [ messageId=0:1462330092323002%9b3f4867f9fd7ecd ]
    push status [ messageId=0:1462330095502779%9b3f4867f9fd7ecd ]
    2016-05-04 08:18:16.883  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
    2016-05-04 08:18:16.927  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 20 ms.
    2016-05-04 08:18:17.392  INFO 9348 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
    2016-05-04 08:18:17.413  INFO 9348 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
    2016-05-04 08:18:17.737  INFO 9348 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
    2016-05-04 08:18:17.766  INFO 9348 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: HSQL
    2016-05-04 08:18:18.032  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
    2016-05-04 08:18:18.147  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=notifyUserJob]] launched with the following parameters: [{run.id=1}]
    2016-05-04 08:18:18.187  INFO 9348 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
    2016-05-04 08:18:22.044  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
    2016-05-04 08:18:22.079  INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:18:28.990  INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl   : date Mon May 02 08:18:12 IST 1
    2016-05-04 08:18:28.991  INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl       : push new unit
    2016-05-04 08:18:32.581  INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl   : date Wed May 04 08:18:32 IST 2016
    2016-05-04 08:18:32.581  INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl       : push new unit
    2016-05-04 08:19:16.876  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=notifyUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
    2016-05-04 08:19:16.999  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{run.id=1}]
    2016-05-04 08:19:17.053  INFO 9348 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:19:17.491  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
    2016-05-04 08:19:52.399  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
    2016-05-04 08:19:52.401  INFO 9348 --- [           main] com.electra.Application                  : Started Application in 133.639 seconds (JVM running for 134.724)
    2016-05-04 08:20:21.066  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
    2016-05-04 08:20:21.288  INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:20:31.103  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

请告诉我我做错了什么以及为什么后续尝试没有执行侦听器。

您的侦听器构造函数中只有代码。但不是在 beforeJob() afterJob() 或任何其他方法中......所以我猜监听器被调用但什么都不做...... 所以你需要覆盖适当的监听器方法。