并行处理 Spring 批处理 StaxEventItemReader
Parellel Processing Spring Batch StaxEventItemReader
我有一个 spring 批处理作业,定义如下。
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="contentItemReader" writer="contentItemWriter"
processor="processor" commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
contentItemReader如下
@Bean
public StaxEventItemReader contentItemReader() {
StaxEventItemReader reader = new StaxEventItemReader();
reader.setFragmentRootElementName("ContentItem");
reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
reader.setUnmarshaller(contentItemUnmarshaller());
return reader;
}
除了比我希望的慢一点之外,一切都很好。我知道这个 reader 不是线程安全的。所以我不认为我可以在 tasklet 中添加一个 taskExecutor。 ContentItems 彼此不依赖,因此我想将数据并行输入处理器。 ItemProcessing 可能相当耗时。所以虽然我知道我不能多线程reader,但我应该可以多线程处理项目。
ItemWriters 也需要是单线程的,因为我使用的是平面文件 ItemWriter。
完成此任务的最佳方法是什么?
只需将您的 reader 包装成这样:
public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {
private ItemReader<T> itemReader;
private boolean isStream = false;
public void setItemReader(ItemReader<T> itemReader) {
this.itemReader = itemReader;
if (itemReader instanceof ItemStream) {
isStream = true;
}
}
@Override
public void close() {
if (isStream) {
((ItemStream) itemReader).close();
}
}
@Override
public void open(ExecutionContext executionContext) {
if (isStream) {
((ItemStream) itemReader).open(new ExecutionContext());
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public synchronized T read() throws Exception {
return itemReader.read();
}
}
你的作家也一样。
请注意,订单不再有保证。
已编辑:
config.xml 中有关于如何使用它的评论。因此,这是一个简单的示例,说明如何将 Wrapper 与 FlatFileItemReader 一起使用:
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="wrappedReader" writer="..."
processor="..." commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
<property name="itemReader">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property .../>
<property .../>
</bean>
</property>
</bean>
自版本 3.0.4
Spring Batch 提供开箱即用的包装器 class(如 Hansjoerg Wingeier):SynchronizedItemStreamReader<T>
我有一个 spring 批处理作业,定义如下。
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="contentItemReader" writer="contentItemWriter"
processor="processor" commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
contentItemReader如下
@Bean
public StaxEventItemReader contentItemReader() {
StaxEventItemReader reader = new StaxEventItemReader();
reader.setFragmentRootElementName("ContentItem");
reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
reader.setUnmarshaller(contentItemUnmarshaller());
return reader;
}
除了比我希望的慢一点之外,一切都很好。我知道这个 reader 不是线程安全的。所以我不认为我可以在 tasklet 中添加一个 taskExecutor。 ContentItems 彼此不依赖,因此我想将数据并行输入处理器。 ItemProcessing 可能相当耗时。所以虽然我知道我不能多线程reader,但我应该可以多线程处理项目。
ItemWriters 也需要是单线程的,因为我使用的是平面文件 ItemWriter。
完成此任务的最佳方法是什么?
只需将您的 reader 包装成这样:
public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {
private ItemReader<T> itemReader;
private boolean isStream = false;
public void setItemReader(ItemReader<T> itemReader) {
this.itemReader = itemReader;
if (itemReader instanceof ItemStream) {
isStream = true;
}
}
@Override
public void close() {
if (isStream) {
((ItemStream) itemReader).close();
}
}
@Override
public void open(ExecutionContext executionContext) {
if (isStream) {
((ItemStream) itemReader).open(new ExecutionContext());
}
}
@Override
public void update(ExecutionContext executionContext) {
}
@Override
public synchronized T read() throws Exception {
return itemReader.read();
}
}
你的作家也一样。
请注意,订单不再有保证。
已编辑:
config.xml 中有关于如何使用它的评论。因此,这是一个简单的示例,说明如何将 Wrapper 与 FlatFileItemReader 一起使用:
<batch:step id="convert">
<batch:tasklet >
<batch:chunk reader="wrappedReader" writer="..."
processor="..." commit-interval="10000" >
</batch:chunk>
</batch:tasklet>
</batch:step>
<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
<property name="itemReader">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property .../>
<property .../>
</bean>
</property>
</bean>
自版本 3.0.4
Spring Batch 提供开箱即用的包装器 class(如 Hansjoerg Wingeier):SynchronizedItemStreamReader<T>