如何让编写器为 spring 批处理中的每个作业实例启动
How to make writer initiated for every job instance in spring batch
我正在编写 spring 批处理作业。我正在使用 KafkaClientWriter extends AbstractItemStreamItemWriter<ProducerMessage>
实现自定义编写器
我有一些字段需要对每个实例都是唯一的。但是我可以看到这个 class 只启动了一次。休息工作有相同的作家实例 class。
我的自定义阅读器和处理器正在为每项工作启动。
以下是我的工作配置。我怎样才能为 writer 实现相同的行为?
@Bean
@Scope("job")
public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) {
FlatFileItemReader faltFileReader = new FlatFileItemReader();
ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader();
Resource[] resArray = new Resource[1];
resArray[0] = new FileSystemResource(new File(fileName));
zipReader.setArchives(resArray);
DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService);
lineMapper.setFieldSetMapper(csvFieldMapper);
faltFileReader.setLineMapper(lineMapper);
zipReader.setDelegate(faltFileReader);
return zipReader;
}
@Bean
@Scope("job")
public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) {
ProducerProcessor processor = new ProducerProcessor();
processor.setS3FileTimeStamp(timeStamp);
return processor;
}
@Bean
@ConfigurationProperties
public ItemWriter<ProducerMessage> writer() {
return new KafkaClientWriter();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader reader, ItemWriter writer,
ItemProcessor processor, @Value("${reader.chunkSize}")
int chunkSize) {
LOGGER.info("Step configuration loaded with chunk size {}", chunkSize);
return stepBuilderFactory.get("step1")
.chunk(chunkSize).reader(reader)
.processor(processor).writer(writer)
.build();
}
@Bean
public StepScope stepScope() {
final StepScope stepScope = new StepScope();
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
public JobScope jobScope() {
final JobScope jobScope = new JobScope();
return jobScope;
}
@Bean
public Configuration configuration() {
return new Configuration();
}
我试着让作家有工作范围。但在那种情况下 open 不会被调用。这是我正在做一些初始化的地方。
当使用基于 java 的配置和作用域代理时,会检测到 return 类型的方法并为此创建代理。因此,当您 return ItemWriter
时,您将获得一个仅实现 ItemWriter
的 JDK 代理,而您的 open
方法位于 ItemStream
接口上。因为该接口不包含在代理中,所以无法调用该方法。
将 return 类型更改为 KafkaClientWriter
或 ItemStreamWriter< ProducerMessage>
(假设 KafkaCLientWriter
实现了该方法)。接下来添加 @Scope("job")
并且您应该使用适当范围的编写器再次调用 open
方法。
我正在编写 spring 批处理作业。我正在使用 KafkaClientWriter extends AbstractItemStreamItemWriter<ProducerMessage>
我有一些字段需要对每个实例都是唯一的。但是我可以看到这个 class 只启动了一次。休息工作有相同的作家实例 class。 我的自定义阅读器和处理器正在为每项工作启动。 以下是我的工作配置。我怎样才能为 writer 实现相同的行为?
@Bean
@Scope("job")
public ZipMultiResourceItemReader reader(@Value("#{jobParameters[fileName]}") String fileName, @Value("#{jobParameters[s3SourceFolderPrefix]}") String s3SourceFolderPrefix, @Value("#{jobParameters[timeStamp]}") long timeStamp, com.fastretailing.catalogPlatformSCMProducer.service.ConfigurationService confService) {
FlatFileItemReader faltFileReader = new FlatFileItemReader();
ZipMultiResourceItemReader zipReader = new ZipMultiResourceItemReader();
Resource[] resArray = new Resource[1];
resArray[0] = new FileSystemResource(new File(fileName));
zipReader.setArchives(resArray);
DefaultLineMapper<ProducerMessage> lineMapper = new DefaultLineMapper<ProducerMessage>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
CSVFieldMapper csvFieldMapper = new CSVFieldMapper(fileName, s3SourceFolderPrefix, timeStamp, confService);
lineMapper.setFieldSetMapper(csvFieldMapper);
faltFileReader.setLineMapper(lineMapper);
zipReader.setDelegate(faltFileReader);
return zipReader;
}
@Bean
@Scope("job")
public ItemProcessor<ProducerMessage, ProducerMessage> processor(@Value("#{jobParameters[timeStamp]}") long timeStamp) {
ProducerProcessor processor = new ProducerProcessor();
processor.setS3FileTimeStamp(timeStamp);
return processor;
}
@Bean
@ConfigurationProperties
public ItemWriter<ProducerMessage> writer() {
return new KafkaClientWriter();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory,
ItemReader reader, ItemWriter writer,
ItemProcessor processor, @Value("${reader.chunkSize}")
int chunkSize) {
LOGGER.info("Step configuration loaded with chunk size {}", chunkSize);
return stepBuilderFactory.get("step1")
.chunk(chunkSize).reader(reader)
.processor(processor).writer(writer)
.build();
}
@Bean
public StepScope stepScope() {
final StepScope stepScope = new StepScope();
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
public JobScope jobScope() {
final JobScope jobScope = new JobScope();
return jobScope;
}
@Bean
public Configuration configuration() {
return new Configuration();
}
我试着让作家有工作范围。但在那种情况下 open 不会被调用。这是我正在做一些初始化的地方。
当使用基于 java 的配置和作用域代理时,会检测到 return 类型的方法并为此创建代理。因此,当您 return ItemWriter
时,您将获得一个仅实现 ItemWriter
的 JDK 代理,而您的 open
方法位于 ItemStream
接口上。因为该接口不包含在代理中,所以无法调用该方法。
将 return 类型更改为 KafkaClientWriter
或 ItemStreamWriter< ProducerMessage>
(假设 KafkaCLientWriter
实现了该方法)。接下来添加 @Scope("job")
并且您应该使用适当范围的编写器再次调用 open
方法。