Spring 运行 reader 的批处理和初始化后的 writer 函数

Spring Batch nor run reader and writer function after initilazation

我有一个 spring 批处理 运行ning,运行 read() 函数在开始时不再结束。我还有 JobExecutionListenerSupport,它有 beforeJobafterJob 它 运行 是这个方法,但没有 运行 读写函数。

批量配置

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public ListItemReader<Blog> reader() {
        System.out.println("reader");
        return new ListItemReader<Blog>(new ArrayList<Blog>());
    }

    @Bean
    public JdbcBatchItemWriter<Blog> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Blog>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Blog>())
                .sql("INSERT INTO blog (param, blog_url, blog_title) VALUES (:identity.param,:identity.url, :blogTitle)")
                .dataSource(dataSource).build();
    }

    @Bean
    public Job importCrawlingBatch(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importCrawlingBatch").incrementer(new RunIdIncrementer()).listener(listener)
                .flow(step1).end().build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Blog> writer)
        
        return stepBuilderFactory.get("step1")
                .<Blog, Blog>chunk(10)
                .reader(reader())
                .writer(writer)
                .faultTolerant()
                .skip(DuplicateKeyException.class).skipPolicy(new ItemSkipPolicy())
                .allowStartIfComplete(true).build();
    }
}

我如何运行多次

@Component
@AllArgsConstructor
@EnableScheduling
public class ScheduledJobConfiguration {
    private final JobLauncher jobLauncher;

    private final Job ourJob;

    @Scheduled(cron = "0 */1 * * * *")
    public void startBatchJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addString("time", new Date().toString())
                .toJobParameters();

        jobLauncher.run(ourJob, jobParameters);
    }

}

还有我的 JobExecutionListenerSupport

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        super.beforeJob(jobExecution);

        System.out.println(">>> CRAWLING STARTED <<<");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println(">>> CRAWLING FINISHED <<<");
        }
    }

}

控制台日志:

 .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.4)

23:46:02 INFO  c.b.a.Application:55 - Starting Application using Java 15 on katesel with PID 6081 (/home/atesel/Projects/PostgreSQLBlogCrawling/Workspace/blogcrawling/target/classes started by atesel in /home/atesel/Projects/PostgreSQLBlogCrawling/Workspace/blogcrawling)
23:46:02 INFO  c.b.a.Application:662 - No active profile set, falling back to default profiles: default
23:46:03 INFO  o.s.d.r.c.RepositoryConfigurationDelegate:128 - Bootstrapping Spring Data JPA repositories in DEFAULT mode.
23:46:03 INFO  o.s.d.r.c.RepositoryConfigurationDelegate:188 - Finished Spring Data repository scanning in 31 ms. Found 2 JPA repository interfaces.
23:46:03 INFO  o.s.b.w.e.t.TomcatWebServer:108 - Tomcat initialized with port(s): 8080 (http)
23:46:03 INFO  o.a.c.c.StandardService:173 - Starting service [Tomcat]
23:46:03 INFO  o.a.c.c.StandardEngine:173 - Starting Servlet engine: [Apache Tomcat/9.0.44]
23:46:03 INFO  o.a.c.c.C.[.[.[/]:173 - Initializing Spring embedded WebApplicationContext
23:46:03 INFO  o.s.b.w.s.c.ServletWebServerApplicationContext:289 - Root WebApplicationContext: initialization completed in 1074 ms
23:46:03 INFO  c.z.h.HikariDataSource:110 - HikariPool-1 - Starting...
23:46:04 INFO  c.z.h.HikariDataSource:123 - HikariPool-1 - Start completed.
23:46:04 INFO  o.h.j.i.u.LogHelper:31 - HHH000204: Processing PersistenceUnitInfo [name: default]
23:46:04 INFO  o.h.Version:44 - HHH000412: Hibernate ORM core version 5.4.29.Final
23:46:04 INFO  o.h.a.c.Version:56 - HCANN000001: Hibernate Commons Annotations {5.1.2.Final}
23:46:04 INFO  o.h.d.Dialect:175 - HHH000400: Using dialect: org.hibernate.dialect.PostgreSQL10Dialect
23:46:04 INFO  o.h.e.t.j.p.i.JtaPlatformInitiator:52 - HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
23:46:04 INFO  o.s.o.j.LocalContainerEntityManagerFactoryBean:437 - Initialized JPA EntityManagerFactory for persistence unit 'default'
Writer
step 1
Reader
23:46:04 INFO  o.s.s.c.ThreadPoolTaskScheduler:181 - Initializing ExecutorService
23:46:04 INFO  o.s.s.c.ThreadPoolTaskScheduler:181 - Initializing ExecutorService 'poolScheduler'
23:46:04 WARN  o.s.b.a.o.j.JpaBaseConfiguration$JpaWebConfiguration:221 - spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
23:46:05 INFO  o.s.s.c.ThreadPoolTaskExecutor:181 - Initializing ExecutorService 'applicationTaskExecutor'
23:46:05 WARN  o.s.b.a.b.JpaBatchConfigurer:57 - JPA does not support custom isolation levels, so locks may not be taken when launching Jobs
23:46:05 INFO  o.s.b.c.r.s.JobRepositoryFactoryBean:185 - No database type set, using meta data indicating: POSTGRES
23:46:05 INFO  o.s.b.c.l.s.SimpleJobLauncher:215 - No TaskExecutor has been set, defaulting to synchronous executor.
23:46:05 INFO  o.s.b.w.e.t.TomcatWebServer:220 - Tomcat started on port(s): 8080 (http) with context path ''
23:46:05 INFO  o.s.s.c.ThreadPoolTaskScheduler:181 - Initializing ExecutorService
Hibernate: 
    select
        cronconfig0_.id as id1_1_0_,
        cronconfig0_.cron_exp as cron_exp2_1_0_ 
    from
        cron_conf cronconfig0_ 
    where
        cronconfig0_.id=?
23:46:05 INFO  c.b.a.Application:61 - Started Application in 3.271 seconds (JVM running for 3.512)
Hibernate: 
    select
        cronconfig0_.id as id1_1_0_,
        cronconfig0_.cron_exp as cron_exp2_1_0_ 
    from
        cron_conf cronconfig0_ 
    where
        cronconfig0_.id=?
23:47:00 INFO  o.s.b.c.l.s.SimpleJobLauncher:146 - Job: [FlowJob: [name=importCrawlingBatch]] launched with the following parameters: [{time=Wed Mar 09 23:47:00 TRT 2022}]
>>> BATCH CRAWLING STARTED <<<
23:47:00 INFO  o.s.b.c.j.SimpleStepHandler:149 - Executing step: [step1]
23:47:00 INFO  o.s.b.c.s.AbstractStep:273 - Step: [step1] executed in 16ms
>>> BATCH  CRAWLING FINISHED <<<

从运行可以看出,没有调用读写函数。他们只在初始化期间调用一次,再也不会调用。

这是因为我没有为 reader 和 writer 函数添加 @StepScop 属性。如果你想了解更多

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Bean
@StepScope
public ListItemReader<Blog> reader() {
    System.out.println("reader");
    return new ListItemReader<Blog>(new ArrayList<Blog>());
}

@Bean
@StepScope
public JdbcBatchItemWriter<Blog> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Blog>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Blog>())
            .sql("INSERT INTO blog (param, blog_url, blog_title) VALUES (:identity.param,:identity.url, :blogTitle)")
            .dataSource(dataSource).build();
}

@Bean
public Job importCrawlingBatch(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory.get("importCrawlingBatch").incrementer(new RunIdIncrementer()).listener(listener)
            .flow(step1).end().build();
}

@Bean
public Step step1(JdbcBatchItemWriter<Blog> writer)
    
    return stepBuilderFactory.get("step1")
            .<Blog, Blog>chunk(10)
            .reader(reader())
            .writer(writer)
            .faultTolerant()
            .skip(DuplicateKeyException.class).skipPolicy(new ItemSkipPolicy())
            .allowStartIfComplete(true).build();
   }
}