Spring 批处理器不是 运行 ItemProcessor 侦听器
Spring Batch Processor not running ItemProcessorListener
所以我在 Spring 批处理 3.0.7.RELEASE
和 Spring 4.3.2.RELEASE
中遇到问题,我的 ItemProcessor
中的听众不是 运行 class。 @StepScope
级别的常规注入对 @Value("#{jobExecutionContext['" + Constants.SECURITY_TOKEN + "']}")
有效,如下所示。但它不适用于beforeProcess
或beforeStep
,我已经尝试了注释版本和接口版本。我几乎 100% 确定这在某些时候是有效的,但无法弄清楚为什么它停止了。
有什么想法吗?我好像配置错了?
AppBatchConfiguration.java
@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {
private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);
private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean(name = "cimAppXmlReader")
@StepScope
public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
reader.setFragmentRootElementNames(getAppRootElementNames());
reader.setSaveState(false);
// Make the StaxEventItemReader thread-safe
SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}
@Bean
@StepScope
public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
reader.setFragmentRootElementNames(fragmentRootElementNames);
reader.setSaveState(false);
return reader;
}
@Bean
public Unmarshaller mecaUnMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
return marshaller;
}
@Bean
public Marshaller uberMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(ServiceRequestType.class);
marshaller.setSupportJaxbElementClass(true);
return marshaller;
}
@Bean(destroyMethod="") // To stop multiple close calls, see:
@StepScope
public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
String outputXmlFilePath) {
SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();
writer.setResource(new FileSystemResource(outputXmlFilePath));
writer.setMarshaller(uberMarshaller());
writer.setSaveState(false);
HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
writer.setRootElementAttributes(rootElementAttribs);
writer.setRootTagName("ns1:SetOfServiceRequests");
return writer;
}
@Bean
@StepScope
public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
return new AppBatchNotificationItemProcessor<T>();
}
@Bean
public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
return new AppBatchCreationItemProcessor();
}
public String[] getAppRootElementNames() {
//get list of App Transaction Element Names
return AppProcessorEnum.getValues();
}
@Bean
public Step AppStep() {
// INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden
// by injected jobParameters using late binding (StepScope)
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Step BatchCreationStep() {
return stepBuilderFactory.get("BatchCreationStep")
.<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
.reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appBatchCreationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Job AppJob() {
return jobBuilderFactory.get("AppJob")
.incrementer(new RunIdIncrementer())
.listener(AppJobCompletionNotificationListener())
.flow(AppStep())
.next(BatchCreationStep())
.end()
.build();
}
@Bean
public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
return new JobCompletionNotificationListener();
}
@Bean
public TaskExecutor concurrentTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);
return taskExecutor;
}
}
AppBatchNotificationItemProcessor.java
@StepScope
public class AppBatchNotificationItemProcessor<E> extends AppAbstractItemProcessor<E, JAXBElement<ServiceRequestType>> implements ItemProcessor<E, JAXBElement<ServiceRequestType>>, StepExecutionListener {
// This is populated correctly
@Value("#{jobExecutionContext['" + Constants.SECURITY_TOKEN + "']}")
private SecurityToken securityToken;
@Autowired
private AppProcessorService processor;
@Override
public JAXBElement<ServiceRequestType> process(E item) throws BPException {
// Do Stuff
return srRequest;
}
@BeforeProcess
public void beforeProcess(E item) {
System.out.println("Doesn't execute");
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Doesn't execute
System.out.println("Doesn't execute");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// Doesn't execute
System.out.println("Doesn't execute");
}
}
据我所知,如果使用 StepScope,则必须在步骤上直接注册 reader、编写器、处理器作为侦听器。
StepScope 阻止框架识别出什么样的接口,resp。 @annotations(例如@BeforeProcess)代理实际上 implements/defines 因此它无法将其注册为侦听器。
所以,我假设如果添加
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.listener(appNotificationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
它会起作用。
这是因为您 return 在 @Bean
方法中使用接口而不是实现。恕我直言,在 Spring 中使用 java 配置时,您应该 return 最具体的类型。原因如下:
通过 XML 配置时,您在 XML 配置中提供 class。这将实现公开给 Spring,以便 class 实现的任何接口都可以被发现并适当处理。当使用 java 配置时,@Bean
方法的 return 类型用作该信息的替代。这就是问题所在。如果你的 return 类型是一个接口,Spring 只知道那个特定的接口,而不是一个实现可能实现的所有接口。通过尽可能 return 使用具体类型,您可以 Spring 深入了解您实际 return 使用的内容,以便它可以更好地为您处理各种注册和连接用例。
对于您的具体示例,由于您正在 returning 一个 ItemProcessor
并且它的步长范围(因此被代理),所有 Spring 知道的是 methods/behaviors预期使用 ItemProcessor
界面。如果您 return 实施 (AppBatchNotificationItemProcessor
),其他行为可以自动配置。
所以我在 Spring 批处理 3.0.7.RELEASE
和 Spring 4.3.2.RELEASE
中遇到问题,我的 ItemProcessor
中的听众不是 运行 class。 @StepScope
级别的常规注入对 @Value("#{jobExecutionContext['" + Constants.SECURITY_TOKEN + "']}")
有效,如下所示。但它不适用于beforeProcess
或beforeStep
,我已经尝试了注释版本和接口版本。我几乎 100% 确定这在某些时候是有效的,但无法弄清楚为什么它停止了。
有什么想法吗?我好像配置错了?
AppBatchConfiguration.java
@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {
private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);
private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean(name = "cimAppXmlReader")
@StepScope
public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
reader.setFragmentRootElementNames(getAppRootElementNames());
reader.setSaveState(false);
// Make the StaxEventItemReader thread-safe
SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}
@Bean
@StepScope
public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
reader.setFragmentRootElementNames(fragmentRootElementNames);
reader.setSaveState(false);
return reader;
}
@Bean
public Unmarshaller mecaUnMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
return marshaller;
}
@Bean
public Marshaller uberMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(ServiceRequestType.class);
marshaller.setSupportJaxbElementClass(true);
return marshaller;
}
@Bean(destroyMethod="") // To stop multiple close calls, see:
@StepScope
public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
String outputXmlFilePath) {
SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();
writer.setResource(new FileSystemResource(outputXmlFilePath));
writer.setMarshaller(uberMarshaller());
writer.setSaveState(false);
HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
writer.setRootElementAttributes(rootElementAttribs);
writer.setRootTagName("ns1:SetOfServiceRequests");
return writer;
}
@Bean
@StepScope
public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
return new AppBatchNotificationItemProcessor<T>();
}
@Bean
public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
return new AppBatchCreationItemProcessor();
}
public String[] getAppRootElementNames() {
//get list of App Transaction Element Names
return AppProcessorEnum.getValues();
}
@Bean
public Step AppStep() {
// INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden
// by injected jobParameters using late binding (StepScope)
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Step BatchCreationStep() {
return stepBuilderFactory.get("BatchCreationStep")
.<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
.reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appBatchCreationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Job AppJob() {
return jobBuilderFactory.get("AppJob")
.incrementer(new RunIdIncrementer())
.listener(AppJobCompletionNotificationListener())
.flow(AppStep())
.next(BatchCreationStep())
.end()
.build();
}
@Bean
public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
return new JobCompletionNotificationListener();
}
@Bean
public TaskExecutor concurrentTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(1);
return taskExecutor;
}
}
AppBatchNotificationItemProcessor.java
@StepScope
public class AppBatchNotificationItemProcessor<E> extends AppAbstractItemProcessor<E, JAXBElement<ServiceRequestType>> implements ItemProcessor<E, JAXBElement<ServiceRequestType>>, StepExecutionListener {
// This is populated correctly
@Value("#{jobExecutionContext['" + Constants.SECURITY_TOKEN + "']}")
private SecurityToken securityToken;
@Autowired
private AppProcessorService processor;
@Override
public JAXBElement<ServiceRequestType> process(E item) throws BPException {
// Do Stuff
return srRequest;
}
@BeforeProcess
public void beforeProcess(E item) {
System.out.println("Doesn't execute");
}
@Override
public void beforeStep(StepExecution stepExecution) {
// Doesn't execute
System.out.println("Doesn't execute");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
// Doesn't execute
System.out.println("Doesn't execute");
}
}
据我所知,如果使用 StepScope,则必须在步骤上直接注册 reader、编写器、处理器作为侦听器。
StepScope 阻止框架识别出什么样的接口,resp。 @annotations(例如@BeforeProcess)代理实际上 implements/defines 因此它无法将其注册为侦听器。
所以,我假设如果添加
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.listener(appNotificationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
它会起作用。
这是因为您 return 在 @Bean
方法中使用接口而不是实现。恕我直言,在 Spring 中使用 java 配置时,您应该 return 最具体的类型。原因如下:
通过 XML 配置时,您在 XML 配置中提供 class。这将实现公开给 Spring,以便 class 实现的任何接口都可以被发现并适当处理。当使用 java 配置时,@Bean
方法的 return 类型用作该信息的替代。这就是问题所在。如果你的 return 类型是一个接口,Spring 只知道那个特定的接口,而不是一个实现可能实现的所有接口。通过尽可能 return 使用具体类型,您可以 Spring 深入了解您实际 return 使用的内容,以便它可以更好地为您处理各种注册和连接用例。
对于您的具体示例,由于您正在 returning 一个 ItemProcessor
并且它的步长范围(因此被代理),所有 Spring 知道的是 methods/behaviors预期使用 ItemProcessor
界面。如果您 return 实施 (AppBatchNotificationItemProcessor
),其他行为可以自动配置。