Spring 批处理 - 我的批处理似乎同时执行两个步骤?

Spring Batch - My Batch seems executing two steps at the same time?

我真的不明白这是怎么回事。我正在研究 Spring 批处理,出于某些原因我想执行两个步骤,一个接一个。

现在请不要介意当前正在执行的步骤,请记住我想按顺序执行两个步骤。

这是代码:

@Configuration
@EnableBatchProcessing
public class JobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    
    private List<Employee> employeesToSave = new ArrayList<Employee>();


    public JsonItemReader<Employee> jsonReader() {

        
        System.out.println("Try to read JSON");
        
        final ObjectMapper mapper = new ObjectMapper();

        final JacksonJsonObjectReader<Employee> jsonObjectReader = new JacksonJsonObjectReader<>(
                Employee.class);
        jsonObjectReader.setMapper(mapper);

        return new JsonItemReaderBuilder<Employee>().jsonObjectReader(jsonObjectReader)
                .resource(new ClassPathResource("input.json"))
                .name("myReader")
                .build();

    }
    

    public ListItemReader<Employee> listReader() {
        
        System.out.println("Read from list");
        
        return new ListItemReader<Employee>(employeesToSave);

*/
    }
    

    public ItemProcessor<Employee,Employee> filterProcessor() {
        return employee -> {
            
            System.out.println("Processing JSON");
            
            return employee;
        };
    }

    public ItemWriter<Employee> filterWriter() {

        return listEmployee -> {
            
            employeesToSave.addAll(listEmployee);
            System.out.println("Save on list " + listEmployee.toString());
                        
        };

    }

    public ItemWriter<Employee> insertToDBWriter() {

        System.out.println("Try to save on DB");
        return listEmployee -> {

            System.out.println("Save on DB " + listEmployee.toString());
                        
        };

    }

    public Step filterStep() {
        
        StepBuilder stepBuilder = stepBuilderFactory.get("filterStep");
        SimpleStepBuilder<Employee, Employee> simpleStepBuilder = stepBuilder.chunk(5);
        return simpleStepBuilder.reader(jsonReader()).processor(filterProcessor()).writer(filterWriter()).build();
    }

    public Step insertToDBStep() {
        
        StepBuilder stepBuilder = stepBuilderFactory.get("insertToDBStep");
        SimpleStepBuilder<Employee, Employee> simpleStepBuilder = stepBuilder.chunk(5);
        return simpleStepBuilder.reader(listReader()).writer(insertToDBWriter()).build();
    }

    @Bean
    public Job myJob(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {

        return jobBuilderFactory.get("myJob").incrementer(new RunIdIncrementer())
                .start(filterStep())
                .next(insertToDBStep())
                .build();
    }
}



为什么 insertToDBStep 不在 filterStep 的末尾开始,实际上看起来过滤器同时是 运行?以及为什么它看起来像在 Root WebApplicationContext 初始化之后开始工作?

这是输出。

2022-05-23 15:40:49.418  INFO 14008 --- [  restartedMain] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 1024 ms
Try to read JSON
Read from list
Try to save on DB
2022-05-23 15:40:49.882  INFO 14008 --- [  restartedMain] o.s.b.d.a.OptionalLiveReloadServer       : LiveReload server is running on port 35729
2022-05-23 15:40:49.917  INFO 14008 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2022-05-23 15:40:49.926  INFO 14008 --- [  restartedMain] c.marco.firstbatch.TestBatchApplication  : Started TestBatchApplication in 1.985 seconds (JVM running for 2.789)
2022-05-23 15:40:49.927  INFO 14008 --- [  restartedMain] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2022-05-23 15:40:49.928  WARN 14008 --- [  restartedMain] o.s.b.c.c.a.DefaultBatchConfigurer       : No datasource was provided...using a Map based JobRepository
2022-05-23 15:40:49.928  WARN 14008 --- [  restartedMain] o.s.b.c.c.a.DefaultBatchConfigurer       : No transaction manager was provided, using a ResourcelessTransactionManager
2022-05-23 15:40:49.943  INFO 14008 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
2022-05-23 15:40:49.972  INFO 14008 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJob]] launched with the following parameters: [{run.id=1}]
2022-05-23 15:40:50.003  INFO 14008 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [filterStep]
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@958d6e7, com.marco.firstbatch.Employee@464d17f8, com.marco.firstbatch.Employee@705520ac, com.marco.firstbatch.Employee@1a9f8e93, com.marco.firstbatch.Employee@55bf8cc9]
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@55d706c0, com.marco.firstbatch.Employee@1bc46dd4]
2022-05-23 15:40:50.074  INFO 14008 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [filterStep] executed in 70ms
2022-05-23 15:40:50.081  INFO 14008 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [insertToDBStep]
2022-05-23 15:40:50.084  INFO 14008 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [insertToDBStep] executed in 3ms
2022-05-23 15:40:50.088  INFO 14008 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [SimpleJob: [name=myJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 96ms

提前致谢。

步骤按顺序正确执行。您将 System.out.println 语句放在两个“种类”的地方:

  • 在配置应用程序上下文时Spring框架执行的bean定义方法中
  • 在运行你的工作
  • 时由Spring批处理调用的批处理工件(项目处理器,项目编写器)的代码中

在您的情况下,Spring 框架将调用以下 bean 定义方法来定义第一步,filterStep():

  • jsonReader():打印 Try to read JSON。此时没有读取文件,只定义了json reader bean。更准确的日志消息是:json reader bean created.
  • listReader():打印 Read from list。同样这里,文件读取还没有开始。更准确的日志消息是:list reader bean created.
  • filterProcessor():什么都不打印。日志语句在 ItemProcessor#process 方法中。这将在运行时由 Spring 批处理调用,而不是在配置时间
  • 的这个时间点调用
  • filterWriter():这里也是一样,print语句是在运行时调用的write方法中,而不是在配置时调用

这导致 filterStep() 的以下输出:

Try to read JSON
Read from list

现在 Spring 框架开始定义下一步,insertToDBStep()。为此,它将根据您的步骤定义按顺序调用以下方法:

  • listReader():这个bean已经定义好了,Spring会重用同一个实例(默认情况下,Spring bean是单例)。因此,此方法没有输出。
  • insertToDBWriter():打印 Try to save on DB。同样在这里,这里没有实际保存到数据库。更准确的日志消息应该是 insertToDBWriter bean created(或者更准确的是 attempting to create insertToDBWriter bean,以防后面的代码抛出异常)。

您现在的累计输出如下:

Try to read JSON
Read from list
Try to save on DB

此时,Spring Framework 已完成配置应用程序上下文的工作,Spring Batch 接管并开始工作。 filterStep()的实际处理开始:

  • reader(ListItemReader)在read方法中没有任何输出。
  • 处理器打印Processing JSON
  • 作者打印 Save on list ...

您似乎有两个块(第一个有 5 个项目,第二个有 2 个项目),这导致以下输出:

2022-05-23 15:40:50.003  INFO 14008 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [filterStep]
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@958d6e7, com.marco.firstbatch.Employee@464d17f8, com.marco.firstbatch.Employee@705520ac, com.marco.firstbatch.Employee@1a9f8e93, com.marco.firstbatch.Employee@55bf8cc9]
Processing JSON
Processing JSON
Save on list [com.marco.firstbatch.Employee@55d706c0, com.marco.firstbatch.Employee@1bc46dd4]
2022-05-23 15:40:50.074  INFO 14008 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [filterStep] executed in 70ms

然后,下一步开始执行,你会得到如下输出:

2022-05-23 15:40:50.081  INFO 14008 --- [  restartedMain] o.s.batch.core.job.SimpleStepHandler     : Executing step: [insertToDBStep]
2022-05-23 15:40:50.084  INFO 14008 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [insertToDBStep] executed in 3ms

这里你可能会问为什么没有insertToDBWriter()写的条目(即为什么没有Save on DB .. 日志)。这是因为 listReader() 是一个单例 bean,你在两个步骤中都使用它,所以当第二步调用它的 read 方法时,它仍然会 return null,因为使用了相同的实例并且已经用完了步骤 1 中的项目列表。因此,由于没有要处理的项目,此步骤立即结束。如果你想在第二步中re-read列表中的项目,你可以用@StepScope注释reader方法。这将为每个步骤创建一个不同的 reader 实例。