@BeforeStep 未在 AsyncProcessor 中调用
@BeforeStep not being called in AsyncProcessor
我一直在使用同步的 ItemProcessor 和 Writer,但现在我将其移至异步,如下面的代码:
@Bean
public Job importFraudCodeJob(Step computeFormFileToDB) {
return jobBuilderFactory.get("Import-Entities-Risk-Codes")
.incrementer(new RunIdIncrementer())
.listener(notificationExecutionListener)
.start(computeFormFileToDB)
.build();
}
@Bean
public Step computeFormFileToDB(ItemReader<EntityRiskCodesDto> entityRiskCodeFileReader) {
return stepBuilderFactory.get("ImportFraudCodesStep")
.<EntityFraudCodesDto, Future<EntityFraudCodes>>chunk(chunkSize)
.reader(entityRiskCodeFileReader)
.processor(asyncProcessor())
.writer(asyncWriter())
.faultTolerant()
.skipPolicy(customSkipPolicy)
.listener(customStepListener)
.listener(chunkCounterListener())
.taskExecutor(taskExecutor())
.throttleLimit(6)
.build();
}
在我的 ItemPocessor 中,我使用 @BeforeStep 获取我存储在 StepExecutionContext 中的值:
@BeforeStep
public void getKey(StepExecution stepExecution) {
log.info("Fetching batchNumber");
ExecutionContext context = stepExecution.getExecutionContext();
this.sequenceNumber = (Integer) context.get("sequenceNumber");
}
这里是我的 AsyncProcessor 的声明:
@Bean
public AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes> asyncProcessor() {
var asyncItemProcessor = new AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes>();
asyncItemProcessor.setDelegate(riskCodeItemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
问题是上面的方法没有被调用。
如何从 StepExecution 获取值并将它们传递到 Asynchronous ItemProcessor 或 AsyncItemWiter?
原因是因为您的项目处理器是异步项目处理器的委托,它不会自动注册为侦听器,这应该手动完成。以下是文档 Intercepting Step Execution 部分的摘录:
If the listener is nested inside another component, it needs to be explicitly
registered (as described previously under "Registering ItemStream with a Step").
因此在您的用例中,您需要在您的步骤中将委托 riskCodeItemProcessor()
注册为侦听器,并且应调用带有 @BeforeStep
注释的方法。这是一个简单的例子:
import java.util.Arrays;
import java.util.concurrent.Future;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class MyJobConfig {
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
}
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return new MyItemProcessor();
}
@Bean
public AsyncItemProcessor<Integer, Integer> asyncItemProcessor() {
AsyncItemProcessor<Integer, Integer> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
System.out.println(Thread.currentThread().getName() + ": item = " + item);
}
};
}
@Bean
public AsyncItemWriter<Integer> asyncItemWriter() {
AsyncItemWriter<Integer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("myJob")
.start(steps.get("myStep")
.<Integer, Future<Integer>>chunk(5)
.reader(itemReader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.listener(itemProcessor())
.build())
.build();
}
static class MyItemProcessor implements ItemProcessor<Integer, Integer> {
private StepExecution stepExecution;
@Override
public Integer process(Integer item) throws Exception {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ": processing item " + item
+ " as part of step " + stepExecution.getStepName());
return item + 1;
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfig.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
这会打印:
SimpleAsyncTaskExecutor-1: processing item 0 as part of step myStep
SimpleAsyncTaskExecutor-2: processing item 1 as part of step myStep
SimpleAsyncTaskExecutor-3: processing item 2 as part of step myStep
SimpleAsyncTaskExecutor-4: processing item 3 as part of step myStep
SimpleAsyncTaskExecutor-5: processing item 4 as part of step myStep
main: item = 1
main: item = 2
main: item = 3
main: item = 4
main: item = 5
SimpleAsyncTaskExecutor-6: processing item 5 as part of step myStep
SimpleAsyncTaskExecutor-7: processing item 6 as part of step myStep
SimpleAsyncTaskExecutor-8: processing item 7 as part of step myStep
SimpleAsyncTaskExecutor-9: processing item 8 as part of step myStep
SimpleAsyncTaskExecutor-10: processing item 9 as part of step myStep
main: item = 6
main: item = 7
main: item = 8
main: item = 9
main: item = 10
也就是说,不建议在多线程设置中依赖执行上下文,因为该上下文在线程之间共享。
我一直在使用同步的 ItemProcessor 和 Writer,但现在我将其移至异步,如下面的代码:
@Bean
public Job importFraudCodeJob(Step computeFormFileToDB) {
return jobBuilderFactory.get("Import-Entities-Risk-Codes")
.incrementer(new RunIdIncrementer())
.listener(notificationExecutionListener)
.start(computeFormFileToDB)
.build();
}
@Bean
public Step computeFormFileToDB(ItemReader<EntityRiskCodesDto> entityRiskCodeFileReader) {
return stepBuilderFactory.get("ImportFraudCodesStep")
.<EntityFraudCodesDto, Future<EntityFraudCodes>>chunk(chunkSize)
.reader(entityRiskCodeFileReader)
.processor(asyncProcessor())
.writer(asyncWriter())
.faultTolerant()
.skipPolicy(customSkipPolicy)
.listener(customStepListener)
.listener(chunkCounterListener())
.taskExecutor(taskExecutor())
.throttleLimit(6)
.build();
}
在我的 ItemPocessor 中,我使用 @BeforeStep 获取我存储在 StepExecutionContext 中的值:
@BeforeStep
public void getKey(StepExecution stepExecution) {
log.info("Fetching batchNumber");
ExecutionContext context = stepExecution.getExecutionContext();
this.sequenceNumber = (Integer) context.get("sequenceNumber");
}
这里是我的 AsyncProcessor 的声明:
@Bean
public AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes> asyncProcessor() {
var asyncItemProcessor = new AsyncItemProcessor<EntityRiskCodesDto, EntityRiskCodes>();
asyncItemProcessor.setDelegate(riskCodeItemProcessor());
asyncItemProcessor.setTaskExecutor(taskExecutor());
return asyncItemProcessor;
}
问题是上面的方法没有被调用。 如何从 StepExecution 获取值并将它们传递到 Asynchronous ItemProcessor 或 AsyncItemWiter?
原因是因为您的项目处理器是异步项目处理器的委托,它不会自动注册为侦听器,这应该手动完成。以下是文档 Intercepting Step Execution 部分的摘录:
If the listener is nested inside another component, it needs to be explicitly
registered (as described previously under "Registering ItemStream with a Step").
因此在您的用例中,您需要在您的步骤中将委托 riskCodeItemProcessor()
注册为侦听器,并且应调用带有 @BeforeStep
注释的方法。这是一个简单的例子:
import java.util.Arrays;
import java.util.concurrent.Future;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.integration.async.AsyncItemProcessor;
import org.springframework.batch.integration.async.AsyncItemWriter;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class MyJobConfig {
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
}
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return new MyItemProcessor();
}
@Bean
public AsyncItemProcessor<Integer, Integer> asyncItemProcessor() {
AsyncItemProcessor<Integer, Integer> asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(itemProcessor());
asyncItemProcessor.setTaskExecutor(new SimpleAsyncTaskExecutor());
return asyncItemProcessor;
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
System.out.println(Thread.currentThread().getName() + ": item = " + item);
}
};
}
@Bean
public AsyncItemWriter<Integer> asyncItemWriter() {
AsyncItemWriter<Integer> asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(itemWriter());
return asyncItemWriter;
}
@Bean
public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("myJob")
.start(steps.get("myStep")
.<Integer, Future<Integer>>chunk(5)
.reader(itemReader())
.processor(asyncItemProcessor())
.writer(asyncItemWriter())
.listener(itemProcessor())
.build())
.build();
}
static class MyItemProcessor implements ItemProcessor<Integer, Integer> {
private StepExecution stepExecution;
@Override
public Integer process(Integer item) throws Exception {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ": processing item " + item
+ " as part of step " + stepExecution.getStepName());
return item + 1;
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJobConfig.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
这会打印:
SimpleAsyncTaskExecutor-1: processing item 0 as part of step myStep
SimpleAsyncTaskExecutor-2: processing item 1 as part of step myStep
SimpleAsyncTaskExecutor-3: processing item 2 as part of step myStep
SimpleAsyncTaskExecutor-4: processing item 3 as part of step myStep
SimpleAsyncTaskExecutor-5: processing item 4 as part of step myStep
main: item = 1
main: item = 2
main: item = 3
main: item = 4
main: item = 5
SimpleAsyncTaskExecutor-6: processing item 5 as part of step myStep
SimpleAsyncTaskExecutor-7: processing item 6 as part of step myStep
SimpleAsyncTaskExecutor-8: processing item 7 as part of step myStep
SimpleAsyncTaskExecutor-9: processing item 8 as part of step myStep
SimpleAsyncTaskExecutor-10: processing item 9 as part of step myStep
main: item = 6
main: item = 7
main: item = 8
main: item = 9
main: item = 10
也就是说,不建议在多线程设置中依赖执行上下文,因为该上下文在线程之间共享。