为什么 intemReader 总是向 CustomItemProcessor 发送完全相同的值
Why the intemReader is always sending the exact same value to CustomItemProcessor
为什么 itemReader 方法总是发送完全相同的文件名以在 CustomItemProcessor 中处理?
据我所知,因为我将 reader 设置为 @Scope 并且我在块中设置了超过 1 个,所以我期望 "return s" 前进到 String 数组的下一个值。
让我用 reader 方法中的调试示例来澄清我的问题:
1 - 变量 stringArray 填充了 3 个文件名(f1.txt、f2.txt 和 f3.txt)
2 - "return s" 由 s = f1.txt
引起
3 - "return s" 在调用 customItemProcessor 方法之前再次调用(直到这里完美,因为 chunk = 2)
4 - 再次查看它包含 f1.txt(与我预期的不同。我预期 f2.txt)
5 和 6 - 运行具有相同名称的处理器 f1.tx(如果 "return s" 的第二轮包含 f2.txt,它应该可以正常工作)
7 - writer 方法按预期工作(processedFiles 包含在 customItemProcessor f1.txt 和 f1.txt 中处理的两个名称的两倍,因为同一个名称被处理了两次)
CustomItemReader
public class CustomItemReader implements ItemReader<String> {
@Override
public String read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
String[] stringArray;
try (Stream<Path> stream = Files.list(Paths.get(env
.getProperty("my.path")))) {
stringArray = stream.map(String::valueOf)
.filter(path -> path.endsWith("out"))
.toArray(size -> new String[size]);
}
//*** the problem is here
//every turn s variable receives the first file name from the stringArray
if (stringArray.length > 0) {
for (String s : stringArray) {
return s;
}
} else {
log.info("read method - no file found");
return null;
}
return null;
}
CustomItemProcessor
public class CustomItemProcessor implements ItemProcessor<String , String> {
@Override
public String process(String singleFileToProcess) throws Exception {
log.info("process method: " + singleFileToProcess);
return singleFileToProcess;
}
}
CustomItemWriter
public class CustomItemWriter implements ItemWriter<String> {
private static final Logger log = LoggerFactory
.getLogger(CustomItemWriter.class);
@Override
public void write(List<? extends String> processedFiles) throws Exception {
processedFiles.stream().forEach(
processedFile -> log.info("**** write method"
+ processedFile.toString()));
FileSystem fs = FileSystems.getDefault();
for (String s : processedFiles) {
Files.deleteIfExists(fs.getPath(s));
}
}
配置
@Configuration
@ComponentScan(...
@EnableBatchProcessing
@EnableScheduling
@PropertySource(...
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobRepository jobRepository;
@Bean
public TaskExecutor getTaskExecutor() {
return new TaskExecutor() {
@Override
public void execute(Runnable task) {
}
};
}
//I can see the number in chunk reflects how many time customReader is triggered before triggers customProcesser
@Bean
public Step step1(ItemReader<String> reader,
ItemProcessor<String, String> processor, ItemWriter<String> writer) {
return stepBuilderFactory.get("step1").<String, String> chunk(2)
.reader(reader).processor(processor).writer(writer)
.allowStartIfComplete(true).build();
}
@Bean
@Scope
public ItemReader<String> reader() {
return new CustomItemReader();
}
@Bean
public ItemProcessor<String, String> processor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter<String> writer() {
return new CustomItemWriter();
}
@Bean
public Job job(Step step1) throws Exception {
return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
}
调度器
@Component
public class QueueScheduler {
private static final Logger log = LoggerFactory
.getLogger(QueueScheduler.class);
private Job job;
private JobLauncher jobLauncher;
@Autowired
public QueueScheduler(JobLauncher jobLauncher, @Qualifier("job") Job job){
this.job = job;
this.jobLauncher = jobLauncher;
}
@Scheduled(fixedRate=60000)
public void runJob(){
try{
jobLauncher.run(job, new JobParameters());
}catch(Exception ex){
log.info(ex.getMessage());
}
}
}
您的问题是您依靠内部循环来遍历项目,而不是让 Spring Batch 通过多次调用 ItemReader#read
来为您完成。
我建议将您的 reader 更改为如下内容:
public class JimsItemReader implements ItemStreamReader {
private String[] items;
private int curIndex = -1;
@Override
public void open(ExecutionContext ec) {
curIndex = ec.getInt("curIndex", -1);
String[] stringArray;
try (Stream<Path> stream = Files.list(Paths.get(env.getProperty("my.path")))) {
stringArray = stream.map(String::valueOf)
.filter(path -> path.endsWith("out"))
.toArray(size -> new String[size]);
}
}
@Override
public void update(ExecutionContext ec) {
ec.putInt("curIndex", curIndex);
}
@Override
public String read() {
if (curIndex < items.length) {
curIndex++;
return items[curIndex];
} else {
return null;
}
}
}
上面的示例应该在读取数组项时循环遍历它们。它还应该是可重新启动的,因为我们将索引存储在 ExecutionContext
中,因此如果作业在失败后重新启动,您将从中断的地方重新启动。
为什么 itemReader 方法总是发送完全相同的文件名以在 CustomItemProcessor 中处理?
据我所知,因为我将 reader 设置为 @Scope 并且我在块中设置了超过 1 个,所以我期望 "return s" 前进到 String 数组的下一个值。
让我用 reader 方法中的调试示例来澄清我的问题:
1 - 变量 stringArray 填充了 3 个文件名(f1.txt、f2.txt 和 f3.txt)
2 - "return s" 由 s = f1.txt
引起3 - "return s" 在调用 customItemProcessor 方法之前再次调用(直到这里完美,因为 chunk = 2)
4 - 再次查看它包含 f1.txt(与我预期的不同。我预期 f2.txt)
5 和 6 - 运行具有相同名称的处理器 f1.tx(如果 "return s" 的第二轮包含 f2.txt,它应该可以正常工作)
7 - writer 方法按预期工作(processedFiles 包含在 customItemProcessor f1.txt 和 f1.txt 中处理的两个名称的两倍,因为同一个名称被处理了两次)
CustomItemReader
public class CustomItemReader implements ItemReader<String> {
@Override
public String read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
String[] stringArray;
try (Stream<Path> stream = Files.list(Paths.get(env
.getProperty("my.path")))) {
stringArray = stream.map(String::valueOf)
.filter(path -> path.endsWith("out"))
.toArray(size -> new String[size]);
}
//*** the problem is here
//every turn s variable receives the first file name from the stringArray
if (stringArray.length > 0) {
for (String s : stringArray) {
return s;
}
} else {
log.info("read method - no file found");
return null;
}
return null;
}
CustomItemProcessor
public class CustomItemProcessor implements ItemProcessor<String , String> {
@Override
public String process(String singleFileToProcess) throws Exception {
log.info("process method: " + singleFileToProcess);
return singleFileToProcess;
}
}
CustomItemWriter
public class CustomItemWriter implements ItemWriter<String> {
private static final Logger log = LoggerFactory
.getLogger(CustomItemWriter.class);
@Override
public void write(List<? extends String> processedFiles) throws Exception {
processedFiles.stream().forEach(
processedFile -> log.info("**** write method"
+ processedFile.toString()));
FileSystem fs = FileSystems.getDefault();
for (String s : processedFiles) {
Files.deleteIfExists(fs.getPath(s));
}
}
配置
@Configuration
@ComponentScan(...
@EnableBatchProcessing
@EnableScheduling
@PropertySource(...
public class BatchConfig {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobRepository jobRepository;
@Bean
public TaskExecutor getTaskExecutor() {
return new TaskExecutor() {
@Override
public void execute(Runnable task) {
}
};
}
//I can see the number in chunk reflects how many time customReader is triggered before triggers customProcesser
@Bean
public Step step1(ItemReader<String> reader,
ItemProcessor<String, String> processor, ItemWriter<String> writer) {
return stepBuilderFactory.get("step1").<String, String> chunk(2)
.reader(reader).processor(processor).writer(writer)
.allowStartIfComplete(true).build();
}
@Bean
@Scope
public ItemReader<String> reader() {
return new CustomItemReader();
}
@Bean
public ItemProcessor<String, String> processor() {
return new CustomItemProcessor();
}
@Bean
public ItemWriter<String> writer() {
return new CustomItemWriter();
}
@Bean
public Job job(Step step1) throws Exception {
return jobBuilderFactory.get("job1").incrementer(new RunIdIncrementer()).start(step1).build();
}
调度器
@Component
public class QueueScheduler {
private static final Logger log = LoggerFactory
.getLogger(QueueScheduler.class);
private Job job;
private JobLauncher jobLauncher;
@Autowired
public QueueScheduler(JobLauncher jobLauncher, @Qualifier("job") Job job){
this.job = job;
this.jobLauncher = jobLauncher;
}
@Scheduled(fixedRate=60000)
public void runJob(){
try{
jobLauncher.run(job, new JobParameters());
}catch(Exception ex){
log.info(ex.getMessage());
}
}
}
您的问题是您依靠内部循环来遍历项目,而不是让 Spring Batch 通过多次调用 ItemReader#read
来为您完成。
我建议将您的 reader 更改为如下内容:
public class JimsItemReader implements ItemStreamReader {
private String[] items;
private int curIndex = -1;
@Override
public void open(ExecutionContext ec) {
curIndex = ec.getInt("curIndex", -1);
String[] stringArray;
try (Stream<Path> stream = Files.list(Paths.get(env.getProperty("my.path")))) {
stringArray = stream.map(String::valueOf)
.filter(path -> path.endsWith("out"))
.toArray(size -> new String[size]);
}
}
@Override
public void update(ExecutionContext ec) {
ec.putInt("curIndex", curIndex);
}
@Override
public String read() {
if (curIndex < items.length) {
curIndex++;
return items[curIndex];
} else {
return null;
}
}
}
上面的示例应该在读取数组项时循环遍历它们。它还应该是可重新启动的,因为我们将索引存储在 ExecutionContext
中,因此如果作业在失败后重新启动,您将从中断的地方重新启动。