Spring 多次执行一个步骤的批处理并行处理
Spring Batch Parallel Processing executing one step multiple times
我正在并行执行 spring 批处理作业,并使用 SimpleAsyncTaskExecutor 进行并行处理,默认限制为 4(默认为 4)。
项目 reader 正在从文本文件中读取行,然后进行处理。
但是发生的事情是文本文件中的一行正在被 4 个不同的线程处理,使其执行一个块 4 次。
下面是我的batch.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="classpath*:/META-INF/spring/batch/override/**/*.xml" />
<bean id="businessReader" class="com.rbsgbm.rates.eodtasks.batch.reader.BusinessItemReader"/>
<bean id="businessProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.BusinessItemProcessor" />
<bean id="businessWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.BusinessItemWriter" />
<bean id="deskReader" class="com.rbsgbm.rates.eodtasks.batch.reader.DeskItemReader"/>
<bean id="deskProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.DeskItemProcessor" />
<bean id="deskWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.DeskItemWriter" />
<bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.TradeSnapTasklet" id="tradeSnapTasklet"/>
<bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.FoundryExtractTasklet" id="foundryExtractTasklet"/>
<bean id="simpleFireTasklet"
class="com.rbsgbm.rates.eodtasks.batch.Tasklet.SimpleFireTasklet" />
<bean id="mdxMarketDataSnapTasklet"
class="com.rbsgbm.rates.eodtasks.batch.Tasklet.MdxMarketDataSnapTasklet" />
<bean id="stepListener" class="org.springframework.batch.core.listener.StepExecutionListenerSupport" />
<bean id="restartJobListener" class="com.rbsgbm.rates.eodtasks.batch.listener.RestartListener"/>
<bean id="failedStepListener" class="com.rbsgbm.rates.eodtasks.batch.listener.FailedStepStepExecutionListener"/>
<bean id="taskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor">
</bean>
<job id="simpleDojJob" xmlns="http://www.springframework.org/schema/batch">
<step id="processBusiness" next="simpleFireTask">
<tasklet>
<chunk reader="businessReader" processor="businessProcessor"
writer="businessWriter" commit-interval="1" />
</tasklet>
</step>
<step id="simpleFireTask" next="foundryTask">
<tasklet task-executor="taskExecutor">
<chunk reader="deskReader" processor="deskProcessor"
writer="deskWriter" commit-interval="1" />
</tasklet>
</step>
<step id="foundryTask">
<tasklet ref="foundryExtractTasklet"/>
<listeners>
<listener ref="stepListener"/>
<listener ref="restartJobListener"/>
<listener ref="failedStepListener"/>
</listeners>
</step>
</job>
</beans>
如果你想拥有线程安全的 Reader 和 Writers,你必须这样实现它们。
默认情况下,每个线程都可能在同一时刻访问您的 reader 或编写器的同一实例。如果您的 reader 和 writer 没有为此实现,它将无法正确处理它。
确保它们是线程安全的最简单的方法是将 reader 分别标记为同步的编写器方法。
如果您无法更改 Reader/Writer 的代码,只需实现一个简单的包装器并委托给您的 Reader/Writer:
public class SynchronizedItemReader<T> implements ItemReader<T>
{
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) {this.delegate = delegate};
public synchronized T read() {
return delegate.read();
}
}
但请注意:如果您还实现了 ItemStream 来跟踪作者已成功提交的内容(因此能够在该位置重新启动),您还需要对其进行管理,因为块可以相互超越。
我正在并行执行 spring 批处理作业,并使用 SimpleAsyncTaskExecutor 进行并行处理,默认限制为 4(默认为 4)。 项目 reader 正在从文本文件中读取行,然后进行处理。 但是发生的事情是文本文件中的一行正在被 4 个不同的线程处理,使其执行一个块 4 次。
下面是我的batch.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="classpath*:/META-INF/spring/batch/override/**/*.xml" />
<bean id="businessReader" class="com.rbsgbm.rates.eodtasks.batch.reader.BusinessItemReader"/>
<bean id="businessProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.BusinessItemProcessor" />
<bean id="businessWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.BusinessItemWriter" />
<bean id="deskReader" class="com.rbsgbm.rates.eodtasks.batch.reader.DeskItemReader"/>
<bean id="deskProcessor" class="com.rbsgbm.rates.eodtasks.batch.processor.DeskItemProcessor" />
<bean id="deskWriter" class="com.rbsgbm.rates.eodtasks.batch.writer.DeskItemWriter" />
<bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.TradeSnapTasklet" id="tradeSnapTasklet"/>
<bean class="com.rbsgbm.rates.eodtasks.batch.Tasklet.FoundryExtractTasklet" id="foundryExtractTasklet"/>
<bean id="simpleFireTasklet"
class="com.rbsgbm.rates.eodtasks.batch.Tasklet.SimpleFireTasklet" />
<bean id="mdxMarketDataSnapTasklet"
class="com.rbsgbm.rates.eodtasks.batch.Tasklet.MdxMarketDataSnapTasklet" />
<bean id="stepListener" class="org.springframework.batch.core.listener.StepExecutionListenerSupport" />
<bean id="restartJobListener" class="com.rbsgbm.rates.eodtasks.batch.listener.RestartListener"/>
<bean id="failedStepListener" class="com.rbsgbm.rates.eodtasks.batch.listener.FailedStepStepExecutionListener"/>
<bean id="taskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor">
</bean>
<job id="simpleDojJob" xmlns="http://www.springframework.org/schema/batch">
<step id="processBusiness" next="simpleFireTask">
<tasklet>
<chunk reader="businessReader" processor="businessProcessor"
writer="businessWriter" commit-interval="1" />
</tasklet>
</step>
<step id="simpleFireTask" next="foundryTask">
<tasklet task-executor="taskExecutor">
<chunk reader="deskReader" processor="deskProcessor"
writer="deskWriter" commit-interval="1" />
</tasklet>
</step>
<step id="foundryTask">
<tasklet ref="foundryExtractTasklet"/>
<listeners>
<listener ref="stepListener"/>
<listener ref="restartJobListener"/>
<listener ref="failedStepListener"/>
</listeners>
</step>
</job>
</beans>
如果你想拥有线程安全的 Reader 和 Writers,你必须这样实现它们。
默认情况下,每个线程都可能在同一时刻访问您的 reader 或编写器的同一实例。如果您的 reader 和 writer 没有为此实现,它将无法正确处理它。
确保它们是线程安全的最简单的方法是将 reader 分别标记为同步的编写器方法。
如果您无法更改 Reader/Writer 的代码,只需实现一个简单的包装器并委托给您的 Reader/Writer:
public class SynchronizedItemReader<T> implements ItemReader<T>
{
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) {this.delegate = delegate};
public synchronized T read() {
return delegate.read();
}
}
但请注意:如果您还实现了 ItemStream 来跟踪作者已成功提交的内容(因此能够在该位置重新启动),您还需要对其进行管理,因为块可以相互超越。