Spring 项目处理器的批处理原型范围
Spring Batch Prototype Scope for Item Processor
所以我在 Spring 批处理 3.0.7.RELEASE
和 Spring 4.3.2.RELEASE
中遇到问题,我们想在使用时使用 ItemProcessor 的 prototype
范围并发。
见appBatchCreationProcessor()
和BatchCreationStep()
,我试过把appBatchCreationProcessor
的范围设为prototype
,但是好像没有什么效果,所有 10 个线程都使用相同的项目处理器。
有办法解决这个问题吗?或者这是设计使然?
AppBatchConfiguration.java
@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {
private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);
private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean(name = "cimAppXmlReader")
@StepScope
public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
reader.setFragmentRootElementNames(getAppRootElementNames());
reader.setSaveState(false);
// Make the StaxEventItemReader thread-safe
SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}
@Bean
@StepScope
public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
reader.setFragmentRootElementNames(fragmentRootElementNames);
reader.setSaveState(false);
return reader;
}
@Bean
public Unmarshaller mecaUnMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
return marshaller;
}
@Bean
public Marshaller uberMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(ServiceRequestType.class);
marshaller.setSupportJaxbElementClass(true);
return marshaller;
}
@Bean(destroyMethod="") // To stop multiple close calls, see:
@StepScope
public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
String outputXmlFilePath) {
SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();
writer.setResource(new FileSystemResource(outputXmlFilePath));
writer.setMarshaller(uberMarshaller());
writer.setSaveState(false);
HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
writer.setRootElementAttributes(rootElementAttribs);
writer.setRootTagName("ns1:SetOfServiceRequests");
return writer;
}
@Bean
@StepScope
public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
return new AppBatchNotificationItemProcessor<T>();
}
@Bean
@Scope(scopeName=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
return new AppBatchCreationItemProcessor();
}
public String[] getAppRootElementNames() {
//get list of App Transaction Element Names
return AppProcessorEnum.getValues();
}
@Bean
public Step AppStep() {
// INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden
// by injected jobParameters using late binding (StepScope)
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Step BatchCreationStep() {
return stepBuilderFactory.get("BatchCreationStep")
.<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
.reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appBatchCreationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(10)
.build();
}
@Bean
public Job AppJob() {
return jobBuilderFactory.get("AppJob")
.incrementer(new RunIdIncrementer())
.listener(AppJobCompletionNotificationListener())
.flow(AppStep())
.next(BatchCreationStep())
.end()
.build();
}
@Bean
public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
return new JobCompletionNotificationListener();
}
@Bean
public TaskExecutor concurrentTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(10);
return taskExecutor;
}
}
在所有线程中使用相同的 appBatchCreationProcessor()
的原因是因为它被注入到单例 BatchCreationStep() 中。此外, BatchCreationStep()
也被注入到单例中 AppJob()
When you use singleton-scoped beans with dependencies on prototype
beans, be aware that dependencies are resolved at instantiation time.
Thus if you dependency-inject a prototype-scoped bean into a
singleton-scoped bean, a new prototype bean is instantiated and then
dependency-injected into the singleton bean. The prototype instance is
the sole instance that is ever supplied to the singleton-scoped bean.
但是,如果您确实需要创建一个新的 appBatchCreationProcessor()
,您可以使用 method injection
是的,这是设计使然。
将作业及其步骤及其 reader、处理器和编写器视为在作业执行之前创建的静态结构。这意味着,在执行作业之前,已经调用了适当的 createReader、createProcessor 方法并且已经创建了 bean 实例。
SCOPE_PROTOTYPE 在此阶段进行评估,并且由于您的 createProcessor 方法在此阶段仅被调用一次,因此只有一个实例。
作业启动后,该结构保持"stable"。
现在,Spring Batch 通过提供 "stepscope" 将 bean 创建推迟到步骤的开始来稍微调整一下。但是,如果您 运行 您的步骤有多个线程,这将无济于事。仍然只有一个实例,比方说您示例中的处理器,并且该实例用于所有线程。
您需要的是 "ThreadScope" 之类的东西,但 spring 或 spring 批次中没有这样的概念。您需要相应地实现您的处理器,例如通过使用 ThreadLocal 成员。
例如,您可以将处理器包装成这样:
public class ThreadLocalItemProcessor implements ItemProcessor {
private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(() -> new MyProcessor());
@Override
public Object process(Object item) throws Exception {
return threadProcessor.get().process(item);
}
}
编辑:带有原型方法的示例
如果你的 Processor 被实例化为一个 SpringBean,它也可以使用 Autowired 进行注入。因此,您可以注入一个原型工厂(当然,原型工厂必须实例化为 springbean),如下所示:
@Configuration
public class PrototypeFactory {
@Bean
@Scope(Prototype)
public YourInterfaceOrClass createInstance() {
return new YourInterfaceOrClass();
}
}
public class ThreadLocalItemProcessor implements ItemProcessor {
@Autowired
private PrototypeFactory prototypeFactory;
private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(this::processorCreator);
@Override
public Object process(Object item) throws Exception {
return threadProcessor.get().process(item);
}
//ItemProcessor directly implemented as lambda
// this will only be called once per working thread
private Object process(Object input) {
// will produce a valid SpringBean instance
YourInterfaceOrClass inst = prototypeFactory.createInstance();
... process the input
}
}
所以我在 Spring 批处理 3.0.7.RELEASE
和 Spring 4.3.2.RELEASE
中遇到问题,我们想在使用时使用 ItemProcessor 的 prototype
范围并发。
见appBatchCreationProcessor()
和BatchCreationStep()
,我试过把appBatchCreationProcessor
的范围设为prototype
,但是好像没有什么效果,所有 10 个线程都使用相同的项目处理器。
有办法解决这个问题吗?或者这是设计使然?
AppBatchConfiguration.java
@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {
private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);
private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Bean(name = "cimAppXmlReader")
@StepScope
public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
reader.setFragmentRootElementNames(getAppRootElementNames());
reader.setSaveState(false);
// Make the StaxEventItemReader thread-safe
SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
synchronizedItemStreamReader.setDelegate(reader);
return synchronizedItemStreamReader;
}
@Bean
@StepScope
public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
String inputXmlFilePath) {
LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
reader.setResource(new FileSystemResource(inputXmlFilePath));
reader.setUnmarshaller(mecaUnMarshaller());
String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
reader.setFragmentRootElementNames(fragmentRootElementNames);
reader.setSaveState(false);
return reader;
}
@Bean
public Unmarshaller mecaUnMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
return marshaller;
}
@Bean
public Marshaller uberMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(ServiceRequestType.class);
marshaller.setSupportJaxbElementClass(true);
return marshaller;
}
@Bean(destroyMethod="") // To stop multiple close calls, see:
@StepScope
public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
String outputXmlFilePath) {
SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();
writer.setResource(new FileSystemResource(outputXmlFilePath));
writer.setMarshaller(uberMarshaller());
writer.setSaveState(false);
HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
writer.setRootElementAttributes(rootElementAttribs);
writer.setRootTagName("ns1:SetOfServiceRequests");
return writer;
}
@Bean
@StepScope
public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
return new AppBatchNotificationItemProcessor<T>();
}
@Bean
@Scope(scopeName=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
return new AppBatchCreationItemProcessor();
}
public String[] getAppRootElementNames() {
//get list of App Transaction Element Names
return AppProcessorEnum.getValues();
}
@Bean
public Step AppStep() {
// INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden
// by injected jobParameters using late binding (StepScope)
return stepBuilderFactory.get("AppStep")
.<Object, JAXBElement<ServiceRequestType>> chunk(10)
.reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appNotificationProcessor())
.writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(1)
.build();
}
@Bean
public Step BatchCreationStep() {
return stepBuilderFactory.get("BatchCreationStep")
.<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
.reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
.processor(appBatchCreationProcessor())
.taskExecutor(concurrentTaskExecutor())
.throttleLimit(10)
.build();
}
@Bean
public Job AppJob() {
return jobBuilderFactory.get("AppJob")
.incrementer(new RunIdIncrementer())
.listener(AppJobCompletionNotificationListener())
.flow(AppStep())
.next(BatchCreationStep())
.end()
.build();
}
@Bean
public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
return new JobCompletionNotificationListener();
}
@Bean
public TaskExecutor concurrentTaskExecutor() {
SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
taskExecutor.setConcurrencyLimit(10);
return taskExecutor;
}
}
在所有线程中使用相同的 appBatchCreationProcessor()
的原因是因为它被注入到单例 BatchCreationStep() 中。此外, BatchCreationStep()
也被注入到单例中 AppJob()
When you use singleton-scoped beans with dependencies on prototype beans, be aware that dependencies are resolved at instantiation time. Thus if you dependency-inject a prototype-scoped bean into a singleton-scoped bean, a new prototype bean is instantiated and then dependency-injected into the singleton bean. The prototype instance is the sole instance that is ever supplied to the singleton-scoped bean.
但是,如果您确实需要创建一个新的 appBatchCreationProcessor()
,您可以使用 method injection
是的,这是设计使然。
将作业及其步骤及其 reader、处理器和编写器视为在作业执行之前创建的静态结构。这意味着,在执行作业之前,已经调用了适当的 createReader、createProcessor 方法并且已经创建了 bean 实例。
SCOPE_PROTOTYPE 在此阶段进行评估,并且由于您的 createProcessor 方法在此阶段仅被调用一次,因此只有一个实例。
作业启动后,该结构保持"stable"。
现在,Spring Batch 通过提供 "stepscope" 将 bean 创建推迟到步骤的开始来稍微调整一下。但是,如果您 运行 您的步骤有多个线程,这将无济于事。仍然只有一个实例,比方说您示例中的处理器,并且该实例用于所有线程。
您需要的是 "ThreadScope" 之类的东西,但 spring 或 spring 批次中没有这样的概念。您需要相应地实现您的处理器,例如通过使用 ThreadLocal 成员。
例如,您可以将处理器包装成这样:
public class ThreadLocalItemProcessor implements ItemProcessor {
private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(() -> new MyProcessor());
@Override
public Object process(Object item) throws Exception {
return threadProcessor.get().process(item);
}
}
编辑:带有原型方法的示例
如果你的 Processor 被实例化为一个 SpringBean,它也可以使用 Autowired 进行注入。因此,您可以注入一个原型工厂(当然,原型工厂必须实例化为 springbean),如下所示:
@Configuration
public class PrototypeFactory {
@Bean
@Scope(Prototype)
public YourInterfaceOrClass createInstance() {
return new YourInterfaceOrClass();
}
}
public class ThreadLocalItemProcessor implements ItemProcessor {
@Autowired
private PrototypeFactory prototypeFactory;
private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(this::processorCreator);
@Override
public Object process(Object item) throws Exception {
return threadProcessor.get().process(item);
}
//ItemProcessor directly implemented as lambda
// this will only be called once per working thread
private Object process(Object input) {
// will produce a valid SpringBean instance
YourInterfaceOrClass inst = prototypeFactory.createInstance();
... process the input
}
}