Spring 批处理不执行 StepExecutionListener beforeStep 或 afterStep 方法
Spring Batch not executing StepExecutionListener beforeStep or afterStep methods
更新: 我重新 运行 我的代码,实际上注意到 none 我的听众正在工作...
我有一个 Spring 批处理应用程序,我正在覆盖 StepExecutionListener 并提供我自己的实现。我正在使用 TaskletStep 注册它,但是,我从未看到 beforeStep/afterStep 方法应该输出的日志消息:
MyStepExecutionListener.java
public class MyStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
// begin my own custom implementation
LOGGER.info("Before the step!");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// begin my own custom implementation
LOGGER.info("After the step!");
return stepExecution.getStatus();
}
}
我在 BatchConfig.java class 中将我的 Tasklet Step 定义为:
public BatchConfig {
@Bean
public static org.springframework.batch.core.scope.JobScope jobScope() {
org.springframework.batch.core.scope.JobScope jobScope = new org.springframework.batch.core.scope.JobScope();
jobScope.setProxyTargetClass(true);
jobScope.setAutoProxy(true);
return jobScope;
}
@Bean
public static org.springframework.batch.core.scope.StepScope stepScope() {
org.springframework.batch.core.scope.StepScope stepScope = new org.springframework.batch.core.scope.StepScope();
stepScope.setProxyTargetClass(true);
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
// @StepScope
public StepExecutionListener stepExecutionListener() {
return new MyStepExecutionListener();
}
@Bean
@Qualifier("s3FlatfFileReaderForMktgOffrs")
@StepScope
public S3FlatFileItemReader<FieldSet> s3FlatfFileReaderForMktgOffrs() {
return new S3FlatFileItemReader<>(lineMapper());
}
@Bean
@Qualifier("s3FlatfFileReaderCustom")
@StepScope
public S3FlatFileItemReader<FieldSet> s3FlatfFileReaderCustom() {
// Custom class that Extends FlatFileItemReader
return new S3FlatFileItemReader<>(lineMapper());
}
@Bean
@Qualifier("myCustomFileItemReader")
@StepScope
public ItemStreamReader<List<FieldSet>> myCustomFileItemReader(
@Value("#{jobParameters}") Map jobParameters) {
String fileName = (String) jobParameters.get("fileName");
String region = (String) jobParameters.get("region");
String bucketName = awsS3EastBucket;
if (StringUtils.equals(region, Regions.US_WEST_2.getName())) {
bucketName = awsS3WestBucket;
}
// Custom class that Extends FlatFileItemReader
S3FlatFileItemReader<FieldSet> s3FileItemReader = s3FlatfFileReaderCustom();
s3FileItemReader.setResource(S3_PROTOCOL_PREFIX + bucketName + SLASH + fileName);
}
s3FileItemReader.setStrict(false);
s3FileItemReader.setLinesToSkip(1);
s3FileItemReader.setSaveState(false);
AggregateItemReader aggregateItemReader = new AggregateItemReader(s3FileItemReader) {
@Override
protected String getItemKey(FieldSet item) {
return item.readString(FIRST_NAME) + "-" +
item.readString(LAST_NAME);
}
};
SynchronizedItemStreamReader<List<FieldSet>> fieldSetSynchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
fieldSetSynchronizedItemStreamReader.setDelegate(aggregateItemReader);
return fieldSetSynchronizedItemStreamReader;
}
@Bean(name = "myCustomStep")
@Scope("prototype")
@SuppressWarnings("unchecked")
public Step myCustomStep(PlatformTransactionManager transactionManager) {
TaskletStep step = stepBuilderFactory.get("myCustomStep")
.<List<FieldSet>, List<MyPayLoadRecord>>chunk(250)
.reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
.processor(myCustomProcessor())
.writer(myCustomWriter())
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.skip(DataValidationException.class)
.listener(stepExecutionListener())
.listener(new CustomReaderListener())
.listener(new CustomProcessListener())
.listener(new CustomWriteListener())
.listener(new CustomSkipListener())
.taskExecutor(batchTaskExecutor())
.throttleLimit(maxThreads)
.build();
step.setTransactionManager(transactionManager);
//step.registerStepExecutionListener(stepExecutionListener());
step.registerChunkListener(new CustomChunkListener());
return step;
}
}
我已经注释掉 step.registerStepExecutionListener(stepExecutionListener());
并尝试设置监听器,如上所示,但两种实现均无效。我的印象是我应该只实现 StepExecutionListener 然后将它注册到 TaskletStep - 我在这里遗漏了什么吗?
我不明白将你的步骤作为原型范围 bean 的原因,但我无法使用类似于你共享的设置重现该问题(使用 Spring Batch v4. 3.3).这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
@EnableBatchProcessing
public class SO68217675 {
@Bean
// @StepScope // works with a step-scoped bean as well
public StepExecutionListener stepExecutionListener() {
return new MyStepExecutionListener();
}
@Bean
// @Scope("prototype") // works with a prototype bean as well, but I don't understand the reason for that scope
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("step")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.writer(items -> items.forEach(System.out::println))
.listener(stepExecutionListener())
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(step(stepBuilderFactory))
.build();
}
static class MyStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("MyStepExecutionListener.beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("MyStepExecutionListener.afterStep");
return ExitStatus.COMPLETED;
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO68217675.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
此示例打印以下内容(如预期):
MyStepExecutionListener.beforeStep
1
2
3
4
MyStepExecutionListener.afterStep
根据@MahmoudBenHassine 的回答 - 通过将监听器(步骤或其中任何一个)实现为静态嵌套 class,我能够在我的 BatchConfiguration.java [=63] 中使用 Slf4j 记录器=],然后看到 确实听众 正在工作 。不过,我仍然感到困惑,为什么当我在自己的 package/class 中定义监听器时,这似乎 不起作用 。然后我开始怀疑 LOGGERS,因为基本上,这就是我验证它是否“工作”的方式。因此,我在我的 StepExecutionerListener 中放置了一些 System.out.println("") 虚拟输出(我遇到的问题是在其自己的自定义 Class/package 中定义的)和 瞧瞧,我看到了输出。
因此,Listener 一直在工作,但是,LOGGERS 没有工作,而且我的验证方法很差,因为这是 shadowing/hiding 的主要问题(使用 LOGGERS 作为 Step/Chunk/Job 监听器是否工作的证据)。
根据这个 post,这让我更加好奇我的 POM 中的 slf4j/log4j 依赖项出了什么问题,或者我可能遗漏了什么:
Spring Batch produces no logging output when launched from the command line
此外,我根据这篇文章以一种记录日志的方式设置了我的 POM.xml,它建议从所有 spring-boot-starters 中删除所有 spring-boot-starter-logging
(我这样做了),如果您打算使用 log4j(就像我一样),则添加 1 spring-boot-starter-log4j2
依赖项:
https://www.baeldung.com/spring-boot-logging
无论如何,我在我的 POM 中定义的 spring-boot-starter-batch
依赖项中删除了 spring-boot-starter-logging
的 <exclusion>
并且日志记录现在(看起来)在自定义 class 听众 (Step/Chunk/Job) 现在。
当我在 IntelliJ 中检查我的项目依赖项时,我有:
log4j:log4j:1.2.15
org.apache.logging.log4j:log4j-api:2.13.3
org.apache.logging.log4j:log4j-core:2.13.3
org.apache.logging.log4j:log4j-jul:2.13.3
org.apache.logging.log4j:log4j-slf4j-impl:2.13.3
org.slf4j:jcl-over-slf4j:1.7.30
org.slf4j:jul-to-slf4j:1.7.30
org.slf4j:log4j-over-slf4j:1.7.30
org.slf4j:slf4j-api:1.7.30
我注意到通过删除 spring-boot-starter-batch 上的排除项,如上所述,我看到添加了此依赖项:
org.apache.logging.log4j:log4j-to-slf4j:2.13.3
所以我假设 Spring 在触发侦听器 before/after
日志记录方法时批处理需要这个。希望这对任何人都有帮助!
更新: 我重新 运行 我的代码,实际上注意到 none 我的听众正在工作...
我有一个 Spring 批处理应用程序,我正在覆盖 StepExecutionListener 并提供我自己的实现。我正在使用 TaskletStep 注册它,但是,我从未看到 beforeStep/afterStep 方法应该输出的日志消息:
MyStepExecutionListener.java
public class MyStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
// begin my own custom implementation
LOGGER.info("Before the step!");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// begin my own custom implementation
LOGGER.info("After the step!");
return stepExecution.getStatus();
}
}
我在 BatchConfig.java class 中将我的 Tasklet Step 定义为:
public BatchConfig {
@Bean
public static org.springframework.batch.core.scope.JobScope jobScope() {
org.springframework.batch.core.scope.JobScope jobScope = new org.springframework.batch.core.scope.JobScope();
jobScope.setProxyTargetClass(true);
jobScope.setAutoProxy(true);
return jobScope;
}
@Bean
public static org.springframework.batch.core.scope.StepScope stepScope() {
org.springframework.batch.core.scope.StepScope stepScope = new org.springframework.batch.core.scope.StepScope();
stepScope.setProxyTargetClass(true);
stepScope.setAutoProxy(true);
return stepScope;
}
@Bean
// @StepScope
public StepExecutionListener stepExecutionListener() {
return new MyStepExecutionListener();
}
@Bean
@Qualifier("s3FlatfFileReaderForMktgOffrs")
@StepScope
public S3FlatFileItemReader<FieldSet> s3FlatfFileReaderForMktgOffrs() {
return new S3FlatFileItemReader<>(lineMapper());
}
@Bean
@Qualifier("s3FlatfFileReaderCustom")
@StepScope
public S3FlatFileItemReader<FieldSet> s3FlatfFileReaderCustom() {
// Custom class that Extends FlatFileItemReader
return new S3FlatFileItemReader<>(lineMapper());
}
@Bean
@Qualifier("myCustomFileItemReader")
@StepScope
public ItemStreamReader<List<FieldSet>> myCustomFileItemReader(
@Value("#{jobParameters}") Map jobParameters) {
String fileName = (String) jobParameters.get("fileName");
String region = (String) jobParameters.get("region");
String bucketName = awsS3EastBucket;
if (StringUtils.equals(region, Regions.US_WEST_2.getName())) {
bucketName = awsS3WestBucket;
}
// Custom class that Extends FlatFileItemReader
S3FlatFileItemReader<FieldSet> s3FileItemReader = s3FlatfFileReaderCustom();
s3FileItemReader.setResource(S3_PROTOCOL_PREFIX + bucketName + SLASH + fileName);
}
s3FileItemReader.setStrict(false);
s3FileItemReader.setLinesToSkip(1);
s3FileItemReader.setSaveState(false);
AggregateItemReader aggregateItemReader = new AggregateItemReader(s3FileItemReader) {
@Override
protected String getItemKey(FieldSet item) {
return item.readString(FIRST_NAME) + "-" +
item.readString(LAST_NAME);
}
};
SynchronizedItemStreamReader<List<FieldSet>> fieldSetSynchronizedItemStreamReader = new SynchronizedItemStreamReader<>();
fieldSetSynchronizedItemStreamReader.setDelegate(aggregateItemReader);
return fieldSetSynchronizedItemStreamReader;
}
@Bean(name = "myCustomStep")
@Scope("prototype")
@SuppressWarnings("unchecked")
public Step myCustomStep(PlatformTransactionManager transactionManager) {
TaskletStep step = stepBuilderFactory.get("myCustomStep")
.<List<FieldSet>, List<MyPayLoadRecord>>chunk(250)
.reader(myCustomFileItemReader(OVERRIDDEN_BY_EXPRESSION))
.processor(myCustomProcessor())
.writer(myCustomWriter())
.faultTolerant()
.skipPolicy(new AlwaysSkipItemSkipPolicy())
.skip(DataValidationException.class)
.listener(stepExecutionListener())
.listener(new CustomReaderListener())
.listener(new CustomProcessListener())
.listener(new CustomWriteListener())
.listener(new CustomSkipListener())
.taskExecutor(batchTaskExecutor())
.throttleLimit(maxThreads)
.build();
step.setTransactionManager(transactionManager);
//step.registerStepExecutionListener(stepExecutionListener());
step.registerChunkListener(new CustomChunkListener());
return step;
}
}
我已经注释掉 step.registerStepExecutionListener(stepExecutionListener());
并尝试设置监听器,如上所示,但两种实现均无效。我的印象是我应该只实现 StepExecutionListener 然后将它注册到 TaskletStep - 我在这里遗漏了什么吗?
我不明白将你的步骤作为原型范围 bean 的原因,但我无法使用类似于你共享的设置重现该问题(使用 Spring Batch v4. 3.3).这是一个简单的例子:
import java.util.Arrays;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
@EnableBatchProcessing
public class SO68217675 {
@Bean
// @StepScope // works with a step-scoped bean as well
public StepExecutionListener stepExecutionListener() {
return new MyStepExecutionListener();
}
@Bean
// @Scope("prototype") // works with a prototype bean as well, but I don't understand the reason for that scope
public Step step(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("step")
.<Integer, Integer>chunk(2)
.reader(new ListItemReader<>(Arrays.asList(1, 2, 3, 4)))
.writer(items -> items.forEach(System.out::println))
.listener(stepExecutionListener())
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
return jobBuilderFactory.get("job")
.start(step(stepBuilderFactory))
.build();
}
static class MyStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("MyStepExecutionListener.beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("MyStepExecutionListener.afterStep");
return ExitStatus.COMPLETED;
}
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(SO68217675.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
此示例打印以下内容(如预期):
MyStepExecutionListener.beforeStep
1
2
3
4
MyStepExecutionListener.afterStep
根据@MahmoudBenHassine 的回答 - 通过将监听器(步骤或其中任何一个)实现为静态嵌套 class,我能够在我的 BatchConfiguration.java [=63] 中使用 Slf4j 记录器=],然后看到 确实听众 正在工作 。不过,我仍然感到困惑,为什么当我在自己的 package/class 中定义监听器时,这似乎 不起作用 。然后我开始怀疑 LOGGERS,因为基本上,这就是我验证它是否“工作”的方式。因此,我在我的 StepExecutionerListener 中放置了一些 System.out.println("") 虚拟输出(我遇到的问题是在其自己的自定义 Class/package 中定义的)和 瞧瞧,我看到了输出。
因此,Listener 一直在工作,但是,LOGGERS 没有工作,而且我的验证方法很差,因为这是 shadowing/hiding 的主要问题(使用 LOGGERS 作为 Step/Chunk/Job 监听器是否工作的证据)。
根据这个 post,这让我更加好奇我的 POM 中的 slf4j/log4j 依赖项出了什么问题,或者我可能遗漏了什么:
Spring Batch produces no logging output when launched from the command line
此外,我根据这篇文章以一种记录日志的方式设置了我的 POM.xml,它建议从所有 spring-boot-starters 中删除所有 spring-boot-starter-logging
(我这样做了),如果您打算使用 log4j(就像我一样),则添加 1 spring-boot-starter-log4j2
依赖项:
https://www.baeldung.com/spring-boot-logging
无论如何,我在我的 POM 中定义的 spring-boot-starter-batch
依赖项中删除了 spring-boot-starter-logging
的 <exclusion>
并且日志记录现在(看起来)在自定义 class 听众 (Step/Chunk/Job) 现在。
当我在 IntelliJ 中检查我的项目依赖项时,我有:
log4j:log4j:1.2.15
org.apache.logging.log4j:log4j-api:2.13.3
org.apache.logging.log4j:log4j-core:2.13.3
org.apache.logging.log4j:log4j-jul:2.13.3
org.apache.logging.log4j:log4j-slf4j-impl:2.13.3
org.slf4j:jcl-over-slf4j:1.7.30
org.slf4j:jul-to-slf4j:1.7.30
org.slf4j:log4j-over-slf4j:1.7.30
org.slf4j:slf4j-api:1.7.30
我注意到通过删除 spring-boot-starter-batch 上的排除项,如上所述,我看到添加了此依赖项:
org.apache.logging.log4j:log4j-to-slf4j:2.13.3
所以我假设 Spring 在触发侦听器 before/after
日志记录方法时批处理需要这个。希望这对任何人都有帮助!