将 MultiResourceItemReader 中的每个资源的 CompletionPolicy 扩展到 complete/commit

Extending CompletionPolicy to complete/commit per resource from MultiResourceItemReader

我的问题是我不了解各种 spring 批处理上下文。参考文档解释了如何将数据传递给后续步骤。但是如何在一个步骤中在 reader 和 writer 组件之间传递数据。步骤上下文。有块上下文吗? 我之前在写 Partitioner 时使用过 before the execution context。但是那些是并行执行的。

我现在需要做一个有序的操作。它基本上是一个 jdbc 导入作业,但每个文件都需要提交,否则它们是外键约束意志。

我能够获取单个文件资源的行数的最简单的地方是在 MultiResourceItemReader 委托给 ItemReader 之前。但是看过各种 CompletionPolicy 实现后,他们似乎只能访问 RepeatContext。我如何将值存储在 MultiResourceItemReader 的 RepeatContext 中,以便我的 CompletionPolicy 可以访问它并在特定文件资源行计数后提交。

有关如何扩展抽象 CountingCompletionPolicy 以及如何在何处存储来自 MultiResourceItemReader 的数据的示例会有所帮助。

或者也许有更好的方法来处理这类工作。

<!-- <job id="job" restartable="${restartable}" xmlns="http://www.springframework.org/schema/batch"> -->
    <batch:job id="job" restartable="true"
        xmlns="http://www.springframework.org/schema/batch">
        <batch:step id="step1-unzipFile">
            <batch:tasklet ref="unzipFileTasklet" />
            <batch:next on="COMPLETED" to="step2-import" />
        </batch:step>
        <batch:step id="step2-import"> <!-- we can't use a commit-interval="${commitInterval} cause it messes with 2nd pass import processing if the commit ends up being the middle of the file -->
            <batch:tasklet>
                <batch:chunk reader="multiResourceReader" writer="itemWriter" chunk-completion-policy="completionPolicy"/>  
            </batch:tasklet>
            <!-- <batch:next on="COMPLETED" to="step3-fileCleanUp" /> -->
        </batch:step>
        <!-- <batch:step id="step3-fileCleanUp">
            <batch:tasklet ref="fileCleanUpTasklet" />
        </batch:step> -->
    </batch:job>




    <bean id="multiResourceReader" class="springbatch.iimport.extended.SequentialLoaderMultiFileResourceItemReader" scope="step">
        <property name="resourceDirectoryPath" value="${importTempDirectoryBasePath}/#{jobParameters['jobKey']}/"/>
        <property name="delegate" ref="itemReader"/>
    </bean>

    <bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="lineMapper" ref="lineMapper" />
    </bean>

    <bean id="lineMapper" class="springbatch.iimport.extended.JsonToTupleLineMapper"/>

    <bean id="itemWriter" class="springbatch.iimport.extended.TupleJdbcBatchItemWriter" scope="step">
        <property name="moduleDataSource" ref="moduleDataSource"/>
        <property name="dataSource" ref="dataSource"/>
        <property name="jobKey" value="#{jobParameters['jobKey']}"/>
        <property name="jobDef" value="#{jobParameters['jobDef']}"/>
    </bean>

    <bean id="completionPolicy" class="?"/> 

    <!-- tasklets -->

    <bean id="unzipFileTasklet" class="springbatch.iimport.tasklets.UnZipFile" scope="step">
        <!-- the temp directory the files are unzipped to end up being #{jobParameters['importZipFileName']} -->
        <property name="importZipFileName" value="${uploadDir}/#{jobParameters['importZipFileName']}" />
        <property name="jobKey" value="#{jobParameters['jobKey']}"/>
        <property name="importTempDirectoryBasePath" value="${importTempDirectoryBasePath}" />
     </bean>

您可以编写自己的 CompletionPolicy 前瞻 itemReader(使用 PeekableItemReader)和 return 当前块作为 'completed' 如果下次调用到 itemReader.next() returns null (表示当前文件的 EOF)。
请记住:此解决方案可能会导致您因读取完整的内存文件内容而出现内存问题。

public class EOFCompletionPolicy extends CompletionPolicySupport
{
    private EOFCompletionContext cc;
    private PeekableItemReader<Object> reader;

    public void setReader(PeekableItemReader<Object> forseeingReader)
    {
        this.reader = forseeingReader;
    }

    @Override
    public boolean isComplete(RepeatContext context)
    {
        return this.cc.isComplete();
    }

    @Override
    public RepeatContext start(RepeatContext context)
    {
        this.cc = new EOFCompletionContext(context);
        return cc;
    }

    @Override
    public void update(RepeatContext context)
    {
        this.cc.update();
    }

    protected class EOFCompletionContext extends RepeatContextSupport
    {
        boolean eof = false;
        public EOFCompletionContext (RepeatContext context)
        {
            super(context);
        }

        public void update()
        {
            final Object next;
            try
            {
                next = reader.peek();
            }
            catch (Exception e)
            {
                throw new NonTransientResourceException("Unable to peek", e);
            }
            // EOF?
            this.eof = (next == null);
        }

        public boolean isComplete() {
            return this.eof;
        }
    }
}