为什么 intemReader 总是向 CustomItemProcessor 发送完全相同的值

Why the intemReader is always sending the exact same value to CustomItemProcessor

为什么 itemReader 方法总是发送完全相同的文件名以在 CustomItemProcessor 中处理?

据我所知,因为我将 reader 设置为 @Scope 并且我在块中设置了超过 1 个,所以我期望 "return s" 前进到 String 数组的下一个值。

让我用 reader 方法中的调试示例来澄清我的问题:

1 - 变量 stringArray 填充了 3 个文件名(f1.txt、f2.txt 和 f3.txt)

2 - "return s" 由 s = f1.txt

引起

3 - "return s" 在调用 customItemProcessor 方法之前再次调用(直到这里完美,因为 chunk = 2)

4 - 再次查看它包含 f1.txt(与我预期的不同。我预期 f2.txt)

5 和 6 - 运行具有相同名称的处理器 f1.tx(如果 "return s" 的第二轮包含 f2.txt,它应该可以正常工作)

7 - writer 方法按预期工作(processedFiles 包含在 customItemProcessor f1.txt 和 f1.txt 中处理的两个名称的两倍,因为同一个名称被处理了两次)

CustomItemReader

public class CustomItemReader implements ItemReader<String> {
       @Override
       public String read() throws Exception, UnexpectedInputException,

                     ParseException, NonTransientResourceException {
              String[] stringArray;

              try (Stream<Path> stream = Files.list(Paths.get(env

                           .getProperty("my.path")))) {

                     stringArray = stream.map(String::valueOf)

                                  .filter(path -> path.endsWith("out"))

                                  .toArray(size -> new String[size]);

              }



              //*** the problem is here

              //every turn s variable receives the first file name from the stringArray

              if (stringArray.length > 0) {

                     for (String s : stringArray) {

                           return s;



                     }

              } else {

                     log.info("read method - no file found");

                     return null;
              }
              return null;
       }

CustomItemProcessor

public class CustomItemProcessor implements ItemProcessor<String , String> {

       @Override

       public String process(String singleFileToProcess) throws Exception {

              log.info("process method: " + singleFileToProcess);

              return singleFileToProcess;

       }

}

CustomItemWriter

public class CustomItemWriter implements ItemWriter<String> {



       private static final Logger log = LoggerFactory

                     .getLogger(CustomItemWriter.class);



       @Override

       public void write(List<? extends String> processedFiles) throws Exception {



              processedFiles.stream().forEach(

                           processedFile -> log.info("**** write method"

                                         + processedFile.toString()));



              FileSystem fs = FileSystems.getDefault();



              for (String s : processedFiles) {

                     Files.deleteIfExists(fs.getPath(s));

              }

       }

配置

@Configuration

@ComponentScan(...

@EnableBatchProcessing

@EnableScheduling

@PropertySource(...

public class BatchConfig {





       @Autowired

       private JobBuilderFactory jobBuilderFactory;



       @Autowired

       private StepBuilderFactory stepBuilderFactory;



       @Autowired

       private JobRepository jobRepository;



    @Bean

    public TaskExecutor getTaskExecutor() {

        return new TaskExecutor() {

            @Override

            public void execute(Runnable task) {



            }

        };

    }



       //I can see the number in chunk reflects how many time customReader is triggered before triggers customProcesser     

       @Bean

       public Step step1(ItemReader<String> reader,

                     ItemProcessor<String, String> processor, ItemWriter<String> writer) {

              return stepBuilderFactory.get("step1").<String, String> chunk(2)

                           .reader(reader).processor(processor).writer(writer)

                           .allowStartIfComplete(true).build();



       }



       @Bean

       @Scope

       public ItemReader<String> reader() {

              return new CustomItemReader();



       }



       @Bean

       public ItemProcessor<String, String> processor() {

              return new CustomItemProcessor();



       }



       @Bean

       public ItemWriter<String> writer() {

              return new CustomItemWriter();



       }



       @Bean

       public Job job(Step step1) throws Exception {

              return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();

       }

调度器

@Component

public class QueueScheduler {



       private static final Logger log = LoggerFactory

                     .getLogger(QueueScheduler.class);



    private Job job;

    private JobLauncher jobLauncher;



    @Autowired

    public QueueScheduler(JobLauncher jobLauncher, @Qualifier("job") Job job){

        this.job = job;

        this.jobLauncher = jobLauncher;

   }

   @Scheduled(fixedRate=60000)

   public void runJob(){

          try{
       jobLauncher.run(job, new JobParameters());

          }catch(Exception ex){

                 log.info(ex.getMessage());

          }

   }

}

您的问题是您依靠内部循环来遍历项目,而不是让 Spring Batch 通过多次调用 ItemReader#read 来为您完成。

我建议将您的 reader 更改为如下内容:

public class JimsItemReader implements ItemStreamReader {
   private String[] items;
   private int curIndex = -1;

   @Override
   public void open(ExecutionContext ec) {
       curIndex = ec.getInt("curIndex", -1);
       String[] stringArray;

       try (Stream<Path> stream = Files.list(Paths.get(env.getProperty("my.path")))) {
           stringArray = stream.map(String::valueOf)
                               .filter(path -> path.endsWith("out"))
                               .toArray(size -> new String[size]);

        }
   }

   @Override
   public void update(ExecutionContext ec) {
       ec.putInt("curIndex", curIndex);
   }

   @Override
   public String read() {
       if (curIndex < items.length) {
           curIndex++;
           return items[curIndex];
       } else {
           return null;
       }
   }
}

上面的示例应该在读取数组项时循环遍历它们。它还应该是可重新启动的,因为我们将索引存储在 ExecutionContext 中,因此如果作业在失败后重新启动,您将从中断的地方重新启动。