由多个线程重新启动 spring 批处理作业 运行

Restarting a spring batch job that is run by multiple threads

我们有一个要求,我们从文件中读取、处理它并写入平面文件。我的问题是 FlatFileItemReader 会跟踪它处理的记录,以便如果作业在中间失败,它可以从失败的地方恢复。

例如,假设节流限制为 2,提交间隔为 10,我的文件有 20 条记录。假设线程 1 正在处理前 10 条记录,并且 线程 2 正在处理接下来的 10 条记录。如果 thread2 的所有 10 条记录都被成功处理,而 thread1 由于一条错误记录而失败,因此整个作业失败。下次 当作业重新启动时,spring 将如何识别未处理的记录?

有什么更好的方法可以使用多线程处理文件,同时在中间失败时能够重新开始。

<batch:job job-repository="jobRepository" id="insertIntoCsvFromCsvJob">
        <batch:step id="step1">
            <batch:tasklet transaction-manager="transactionManager"
                task-executor="taskExecutor" throttle-limit="${throttle-limit}">
                <batch:chunk reader="csvFileItemReader" writer="customWriter" processor="compositeProcessor
                    commit-interval="${commit-interval}" >
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="csvFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader">
         <property name="resource" value="classpath:files/input.csv" />         
        <property name="lineMapper" ref="fieldSetMapper" />
    </bean>

    <bean id="csvFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:c:/outout.csv" />
        <property name="shouldDeleteIfExists" value="true" />
        <property name="lineAggregator" ref="lineAggregator" />
    </bean>

    <bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

不,不会。

我什至会说你的代码迟早会出错。问题在于 FlatFileItemReader(分别是 FlatFileItemWriter)的读取(分别是写入方法)不是线程安全的。

如果您想异步使用它们,您需要实现一个包装器 ItemWriter 和 ItemReader,以同步对 FlatFileItemReader/Writer 的调用。

但是,当然,整个中间重启是行不通的,因为如果只使用 FlatFileItemReader/Writer 的标准实现,则无法保证块的顺序。问题是一个块可能会超越另一个块,导致执行上下文中的读取位置指针在被超越的块之后移动。但是如果被接管的chunk失败了,执行上下文中的位置会表明失败的chunk的条目已经被成功处理。

当然,您也可以在适配器中实现您的逻辑,您可以在适配器中跟踪已处理的条目,并且仅在您知道之前的所有条目都已处理并已写入时才向前移动位置指针。