Spring 多次执行一个步骤的批处理并行处理

Spring Batch Parallel Processing executing one step multiple times

我正在并行执行 spring 批处理作业,并使用 SimpleAsyncTaskExecutor 进行并行处理,默认限制为 4(默认为 4)。 项目 reader 正在从文本文件中读取行,然后进行处理。 但是发生的事情是文本文件中的一行正在被 4 个不同的线程处理,使其执行一个块 4 次。

下面是我的batch.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <import resource="classpath*:/META-INF/spring/batch/override/**/*.xml" />
    <bean id="businessReader" class="com.rbsgbm.rates.eodtasks.batch.reader.BusinessItemReader"/>
    <bean id="businessProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.BusinessItemProcessor" />
    <bean id="businessWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.BusinessItemWriter" />
    <bean id="deskReader" class="com.rbsgbm.rates.eodtasks.batch.reader.DeskItemReader"/>
    <bean id="deskProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.DeskItemProcessor" />
    <bean id="deskWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.DeskItemWriter" />
    <bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.TradeSnapTasklet" id="tradeSnapTasklet"/>
    <bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.FoundryExtractTasklet" id="foundryExtractTasklet"/>
    <bean id="simpleFireTasklet"
        class="com.rbsgbm.rates.eodtasks.batch.Tasklet.SimpleFireTasklet" />

    <bean id="mdxMarketDataSnapTasklet"
        class="com.rbsgbm.rates.eodtasks.batch.Tasklet.MdxMarketDataSnapTasklet" />

    <bean id="stepListener" class="org.springframework.batch.core.listener.StepExecutionListenerSupport" />
    <bean id="restartJobListener" class="com.rbsgbm.rates.eodtasks.batch.listener.RestartListener"/>
    <bean id="failedStepListener" class="com.rbsgbm.rates.eodtasks.batch.listener.FailedStepStepExecutionListener"/>
    <bean id="taskExecutor"
        class="org.springframework.core.task.SimpleAsyncTaskExecutor">
    </bean>

    <job id="simpleDojJob"  xmlns="http://www.springframework.org/schema/batch">
        <step id="processBusiness" next="simpleFireTask">
            <tasklet>
                <chunk reader="businessReader" processor="businessProcessor"
                    writer="businessWriter" commit-interval="1" />
            </tasklet>

        </step>

        <step id="simpleFireTask" next="foundryTask">
            <tasklet task-executor="taskExecutor">
                <chunk reader="deskReader" processor="deskProcessor"
                    writer="deskWriter" commit-interval="1" />
            </tasklet>

        </step>

        <step id="foundryTask">
            <tasklet ref="foundryExtractTasklet"/>
            <listeners>
                    <listener ref="stepListener"/>
                    <listener ref="restartJobListener"/>
                    <listener ref="failedStepListener"/>
            </listeners>    
        </step>
    </job>
</beans>

如果你想拥有线程安全的 Reader 和 Writers,你必须这样实现它们。

默认情况下,每个线程都可能在同一时刻访问您的 reader 或编写器的同一实例。如果您的 reader 和 writer 没有为此实现,它将无法正确处理它。

确保它们是线程安全的最简单的方法是将 reader 分别标记为同步的编写器方法。

如果您无法更改 Reader/Writer 的代码,只需实现一个简单的包装器并委托给您的 Reader/Writer:

public class SynchronizedItemReader<T> implements ItemReader<T>
{
    private ItemReader<T> delegate;
    public void setDelegate(ItemReader<T> delegate) {this.delegate = delegate};

    public synchronized T read() {
        return delegate.read();
    }
}

但请注意:如果您还实现了 ItemStream 来跟踪作者已成功提交的内容(因此能够在该位置重新启动),您还需要对其进行管理,因为块可以相互超越。