Spring 批处理重复步骤以永无止境的循环结束

Spring batch repeat step ending up in never ending loop

我有一个 spring 批处理作业,我想执行以下操作...

Step 1 - 
   Tasklet - Create a list of dates, store the list of dates in the job execution context.

Step 2 - 
   JDBC Item Reader - Get list of dates from job execution context.
                      Get element(0) in dates list. Use is as input for jdbc query. 
                      Store element(0) date is job execution context 
                      Remove element(0) date from list of dates
                      Store element(0) date in job execution context                 
   Flat File Item Writer - Get element(0) date from job execution context and use for file name.

Then using a job listener repeat step 2 until no remaining dates in the list of dates.

我已经创建了作业,它在第一次执行第 2 步时工作正常。但是第 2 步没有像我希望的那样重复。我知道这一点,因为当我调试我的代码时,它只会在第 2 步的初始 运行 中断。

然而,它确实继续给我如下消息,就好像它是 运行正在执行第 2 步,即使我知道它不是。

2016-08-10 22:20:57.842  INFO 11784 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Duplicate step [readStgDbAndExportMasterListStep] detected in execution of job=[exportMasterListCsv]. If either step fails, both will be executed again on restart.
2016-08-10 22:20:57.846  INFO 11784 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [readStgDbAndExportMasterListStep]

这是一个永无止境的循环。

有人可以帮我弄清楚为什么我的 stpe 2 只 运行ning 一次吗?

提前致谢

我已经为我的代码添加了两个指向 PasteBin 的链接,以免污染此 post。

http://pastebin.com/QhExNikm(作业配置)

http://pastebin.com/sscKKWRk(通用作业配置)

http://pastebin.com/Nn74zTpS(步骤执行侦听器)

根据你的问题和你的代码,我根据你检索的日期数量(这发生在实际工作开始之前)扣除,你将根据你有日期的次数执行一个步骤。

我建议更改设计。创建一个 java class,它将以列表的形式为您提供日期,并根据该列表动态创建您的步数。像这样:

@EnableBatchProcessing
public class JobConfig {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;  

    @Autowired
    private JobDatesCreator jobDatesCreator;

    @Bean
    public Job executeMyJob() {
        List<Step> steps = new ArrayList<Step>();
        for (String date : jobDatesCreator.getDates()) {
            steps.add(createStep(date));
        }

        return jobBuilderFactory.get("executeMyJob")
                .start(createParallelFlow(steps))
                .end()
                .build();       
    }

    private Step createStep(String date){
        return stepBuilderFactory.get("readStgDbAndExportMasterListStep" + date)
                .chunk(your_chunksize)
                .reader(your_reader)
                .processor(your_processor)
                .writer(your_writer)                                
                .build();       
    }   

    private Flow createParallelFlow(List<Step> steps) {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        // max multithreading = -1, no multithreading = 1, smart size = steps.size()
        taskExecutor.setConcurrencyLimit(1); 

        List<Flow> flows = steps.stream()
                .map(step -> new FlowBuilder<Flow>("flow_" + step.getName()).start(step).build())
                .collect(Collectors.toList());

        return new FlowBuilder<SimpleFlow>("parallelStepsFlow")
                .split(taskExecutor)
                .add(flows.toArray(new Flow[flows.size()]))
                .build();
    }  
}

编辑: 添加了 "jobParameter" 输入(方法也略有不同)

在您的 class 路径的某处添加以下示例 .properties 文件:

sql.statement="select * from awesome"

并将以下注释添加到您的 JobDatesCreator class

@PropertySource("classpath:example.properties")

您也可以提供特定的 sql 语句作为命令行参数。来自 spring 文档:

you can launch with a specific command line switch (e.g. java -jar app.jar --name="Spring").

有关详细信息,请参阅 http://docs.spring.io/spring-boot/docs/current/reference/html/boot-features-external-config.html

获取日期的 class(为什么要为此使用 tasklet?):

@PropertySource("classpath:example.properties")
public class JobDatesCreator {

    @Value("${sql.statement}")
    private String sqlStatement;

    @Autowired
    private CommonExportFromStagingDbJobConfig commonJobConfig; 

    private List<String> dates; 

    @PostConstruct
    private void init(){
        // Execute your logic here for getting the data you need.
        JdbcTemplate jdbcTemplate = new JdbcTemplate(commonJobConfig.onlineStagingDb);
        // acces to your sql statement provided in a property file or as a command line argument
        System.out.println("This is the sql statement I provided in my external property: " + sqlStatement);

        // for now..
        dates = new ArrayList<>();
        dates.add("date 1");
        dates.add("date 2");
    }

    public List<String> getDates() {
        return dates;
    }

    public void setDates(List<String> dates) {
        this.dates = dates;
    }
}

我还注意到您有很多可以很容易地重构的重复代码。现在对于每个作家你有这样的东西:

@Bean
public FlatFileItemWriter<MasterList> division10MasterListFileWriter() {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, MerchHierarchyConstants.DIVISION_NO_10 )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

考虑改用这样的东西:

public FlatFileItemWriter<MasterList> divisionMasterListFileWriter(String divisionNumber) {
    FlatFileItemWriter<MasterList> writer = new FlatFileItemWriter<>();
    writer.setResource(new FileSystemResource(new File(outDir, divisionNumber )));
    writer.setHeaderCallback(masterListFlatFileHeaderCallback());
    writer.setLineAggregator(masterListFormatterLineAggregator());
    return writer;
}

由于并非所有代码都可用于正确复制您的问题,因此此答案 suggestion/indication 可以解决您的问题。

根据我们在 上的讨论,我试图回答有关如何在实际执行作业之前访问 jobParameter 的问题。

我假设有 restcall 将执行批处理。一般来说,这需要采取以下步骤。 1. 一段代码接收rest调用及其参数 2.创建一个新的springcontext(有方法可以重用现有的上下文并再次启动作业,但是在重用步骤、读取器和写入器时存在一些问题) 3. 启动作业

最简单的解决方案是将从服务接收的作业参数存储为系统-属性,然后在步骤 3 中构建作业时访问此 属性。但这可能会导致如果多个用户同时启动作业,则会出现问题。

还有其他方法可以在加载时将参数传递给 springcontext。但这取决于您设置上下文的方式。 例如,如果您在步骤 2 中直接使用 SpringBoot,则可以编写如下方法:

private int startJob(Properties jobParamsAsProps) {
  SpringApplication springApp = new SpringApplication(.. my config classes ..);
  springApp.setDefaultProperties(jobParamsAsProps);

  ConfigurableApplicationContext context = springApp.run();
  ExitCodeGenerator exitCodeGen = context.getBean(ExitCodeGenerator.class);
  int code = exitCodeGen.getExitCode();
  context.close();
  return cod;
}

这样,您就可以使用标准值或配置属性注释正常访问属性。