Spring 项目处理器的批处理原型范围

Spring Batch Prototype Scope for Item Processor

所以我在 Spring 批处理 3.0.7.RELEASE 和 Spring 4.3.2.RELEASE 中遇到问题,我们想在使用时使用 ItemProcessor 的 prototype 范围并发。

appBatchCreationProcessor()BatchCreationStep(),我试过把appBatchCreationProcessor的范围设为prototype,但是好像没有什么效果,所有 10 个线程都使用相同的项目处理器。

有办法解决这个问题吗?或者这是设计使然?

AppBatchConfiguration.java

@Configuration
@EnableBatchProcessing
@ComponentScan(basePackages = "our.org.base")
public class AppBatchConfiguration {

    private final static SimpleLogger LOGGER = SimpleLogger.getInstance(AppBatchConfiguration.class);

    private final static String OUTPUT_XML_FILE_PATH_PLACEHOLDER = null;
    private final static String INPUT_XML_FILE_PATH_PLACEHOLDER = null;

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean(name = "cimAppXmlReader")
    @StepScope
    public <T> ItemStreamReader<T> appXmlReader(@Value("#{jobParameters[inputXmlFilePath]}")
    String inputXmlFilePath) {
        LOGGER.info("Job Parameter => App XML File Path :" + inputXmlFilePath);
        StaxEventItemReader<T> reader = new StaxEventItemReader<T>();
        reader.setResource(new FileSystemResource(inputXmlFilePath));
        reader.setUnmarshaller(mecaUnMarshaller());
        reader.setFragmentRootElementNames(getAppRootElementNames());
        reader.setSaveState(false);

        // Make the StaxEventItemReader thread-safe
        SynchronizedItemStreamReader<T> synchronizedItemStreamReader = new SynchronizedItemStreamReader<T>();
        synchronizedItemStreamReader.setDelegate(reader);

        return synchronizedItemStreamReader;
    }

    @Bean
    @StepScope
    public ItemStreamReader<JAXBElement<AppIBTransactionHeaderType>> appXmlTransactionHeaderReader(@Value("#{jobParameters[inputXmlFilePath]}")
    String inputXmlFilePath) {
        LOGGER.info("Job Parameter => App XML File Path for Transaction Header :" + inputXmlFilePath);
        StaxEventItemReader<JAXBElement<AppIBTransactionHeaderType>> reader = new StaxEventItemReader<>();
        reader.setResource(new FileSystemResource(inputXmlFilePath));
        reader.setUnmarshaller(mecaUnMarshaller());

        String[] fragmentRootElementNames = new String[] {"AppIBTransactionHeader"};
        reader.setFragmentRootElementNames(fragmentRootElementNames);
        reader.setSaveState(false);

        return reader;
    }

    @Bean
    public Unmarshaller mecaUnMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setPackagesToScan(ObjectFactory.class.getPackage().getName());
        return marshaller;
    }

    @Bean
    public Marshaller uberMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setClassesToBeBound(ServiceRequestType.class);
        marshaller.setSupportJaxbElementClass(true);
        return marshaller;
    }

    @Bean(destroyMethod="") // To stop multiple close calls, see: 
    @StepScope
    public ResourceAwareItemWriterItemStream<JAXBElement<ServiceRequestType>> writer(@Value("#{jobParameters[outputXmlFilePath]}")
    String outputXmlFilePath) {
        SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>> writer = new SyncStaxEventItemWriter<JAXBElement<ServiceRequestType>>();

        writer.setResource(new FileSystemResource(outputXmlFilePath));
        writer.setMarshaller(uberMarshaller());
        writer.setSaveState(false);
        HashMap<String, String> rootElementAttribs = new HashMap<String, String>();
        rootElementAttribs.put("xmlns:ns1", "http://some.org/corporate/message/2010/1");
        writer.setRootElementAttributes(rootElementAttribs);
        writer.setRootTagName("ns1:SetOfServiceRequests");

        return writer;
    }

    @Bean
    @StepScope
    public <T> ItemProcessor<T, JAXBElement<ServiceRequestType>> appNotificationProcessor() {
        return new AppBatchNotificationItemProcessor<T>();
    }

    @Bean
    @Scope(scopeName=ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public ItemProcessor<JAXBElement<AppIBTransactionHeaderType>, Boolean> appBatchCreationProcessor() {
        return new AppBatchCreationItemProcessor();
    }


    public String[] getAppRootElementNames() {        
        //get list of App Transaction Element Names        
        return AppProcessorEnum.getValues();         
    }

    @Bean
    public Step AppStep() {
        // INPUT_XML_FILE_PATH_PLACEHOLDER and OUTPUT_XML_FILE_PATH_PLACEHOLDER will be overridden 
        // by injected jobParameters using late binding (StepScope)
        return stepBuilderFactory.get("AppStep")
                .<Object, JAXBElement<ServiceRequestType>> chunk(10)
                .reader(appXmlReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
                .processor(appNotificationProcessor())
                .writer(writer(OUTPUT_XML_FILE_PATH_PLACEHOLDER))
                .taskExecutor(concurrentTaskExecutor())
                .throttleLimit(1)
                .build();

    }

    @Bean
    public Step BatchCreationStep() {
        return stepBuilderFactory.get("BatchCreationStep")
                .<JAXBElement<AppIBTransactionHeaderType>, Boolean>chunk(1)
                .reader(appXmlTransactionHeaderReader(INPUT_XML_FILE_PATH_PLACEHOLDER))
                .processor(appBatchCreationProcessor())
                .taskExecutor(concurrentTaskExecutor())
                .throttleLimit(10)
                .build();
    }

    @Bean
    public Job AppJob() {
        return jobBuilderFactory.get("AppJob")
                .incrementer(new RunIdIncrementer())
                .listener(AppJobCompletionNotificationListener())
                .flow(AppStep())
                .next(BatchCreationStep())
                .end()
                .build();
    }

    @Bean
    public JobCompletionNotificationListener AppJobCompletionNotificationListener() {
        return new JobCompletionNotificationListener();
    }

    @Bean
    public TaskExecutor concurrentTaskExecutor() {
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        taskExecutor.setConcurrencyLimit(10);
        return taskExecutor;
    }
}

在所有线程中使用相同的 appBatchCreationProcessor() 的原因是因为它被注入到单例 BatchCreationStep() 中。此外, BatchCreationStep() 也被注入到单例中 AppJob()

根据this documentation

When you use singleton-scoped beans with dependencies on prototype beans, be aware that dependencies are resolved at instantiation time. Thus if you dependency-inject a prototype-scoped bean into a singleton-scoped bean, a new prototype bean is instantiated and then dependency-injected into the singleton bean. The prototype instance is the sole instance that is ever supplied to the singleton-scoped bean.

但是,如果您确实需要创建一个新的 appBatchCreationProcessor(),您可以使用 method injection

是的,这是设计使然。

将作业及其步骤及其 reader、处理器和编写器视为在作业执行之前创建的静态结构。这意味着,在执行作业之前,已经调用了适当的 createReader、createProcessor 方法并且已经创建了 bean 实例。

SCOPE_PROTOTYPE 在此阶段进行评估,并且由于您的 createProcessor 方法在此阶段仅被调用一次,因此只有一个实例。

作业启动后,该结构保持"stable"。

现在,Spring Batch 通过提供 "stepscope" 将 bean 创建推迟到步骤的开始来稍微调整一下。但是,如果您 运行 您的步骤有多个线程,这将无济于事。仍然只有一个实例,比方说您示例中的处理器,并且该实例用于所有线程。

您需要的是 "ThreadScope" 之类的东西,但 spring 或 spring 批次中没有这样的概念。您需要相应地实现您的处理器,例如通过使用 ThreadLocal 成员。

例如,您可以将处理器包装成这样:

public class ThreadLocalItemProcessor implements ItemProcessor {

    private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(() -> new MyProcessor());

    @Override
    public Object process(Object item) throws Exception {
        return threadProcessor.get().process(item);
    }
}

编辑:带有原型方法的示例

如果你的 Processor 被实例化为一个 SpringBean,它也可以使用 Autowired 进行注入。因此,您可以注入一个原型工厂(当然,原型工厂必须实例化为 springbean),如下所示:

@Configuration
public class PrototypeFactory {
    @Bean
    @Scope(Prototype)
    public YourInterfaceOrClass createInstance() {
         return new YourInterfaceOrClass();
    }
}

public class ThreadLocalItemProcessor implements ItemProcessor {

    @Autowired
    private PrototypeFactory prototypeFactory;

    private ThreadLocal<ItemProcessor> threadProcessor = ThreadLocal.withInitial(this::processorCreator);

    @Override
    public Object process(Object item) throws Exception {
        return threadProcessor.get().process(item);
    }

    //ItemProcessor directly implemented as lambda
    // this will only be called once per working thread
    private Object process(Object input) {

       // will produce a valid SpringBean instance
       YourInterfaceOrClass inst = prototypeFactory.createInstance();

       ... process the input           
    }
}