并行处理 Spring 批处理 StaxEventItemReader

Parellel Processing Spring Batch StaxEventItemReader

我有一个 spring 批处理作业,定义如下。

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="contentItemReader" writer="contentItemWriter"
                             processor="processor" commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

contentItemReader如下

 @Bean
 public StaxEventItemReader contentItemReader() {
        StaxEventItemReader reader = new StaxEventItemReader();
        reader.setFragmentRootElementName("ContentItem");
        reader.setResource(new FileSystemResource(baseDirectory.concat(inputFile)));
        reader.setUnmarshaller(contentItemUnmarshaller());
        return reader;
 }

除了比我希望的慢一点之外,一切都很好。我知道这个 reader 不是线程安全的。所以我不认为我可以在 tasklet 中添加一个 taskExecutor。 ContentItems 彼此不依赖,因此我想将数据并行输入处理器。 ItemProcessing 可能相当耗时。所以虽然我知道我不能多线程reader,但我应该可以多线程处理项目。

ItemWriters 也需要是单线程的,因为我使用的是平面文件 ItemWriter。

完成此任务的最佳方法是什么?

只需将您的 reader 包装成这样:

public class SynchronizedWrapperReader<T> implements ItemStreamReader<T> {

  private ItemReader<T> itemReader;
  private boolean isStream = false;

  public void setItemReader(ItemReader<T> itemReader) {
    this.itemReader = itemReader;
    if (itemReader instanceof ItemStream) {
      isStream = true;
    }
  }

  @Override
  public void close() {
    if (isStream) {
      ((ItemStream) itemReader).close();
    }
  }

  @Override
  public void open(ExecutionContext executionContext) {
    if (isStream) {
      ((ItemStream) itemReader).open(new ExecutionContext());
    }
  }

  @Override
  public void update(ExecutionContext executionContext) {
  }

  @Override
  public synchronized T read() throws Exception {
    return itemReader.read();
  }
}

你的作家也一样。

请注意,订单不再有保证。

已编辑:

config.xml 中有关于如何使用它的评论。因此,这是一个简单的示例,说明如何将 Wrapper 与 FlatFileItemReader 一起使用:

<batch:step id="convert">
    <batch:tasklet >
        <batch:chunk reader="wrappedReader" writer="..."
                             processor="..." commit-interval="10000" >
        </batch:chunk>
     </batch:tasklet>
</batch:step>

<bean id="wrappedReader" class=[package].SynchronizedWrapperReader">
   <property name="itemReader">
      <bean class="org.springframework.batch.item.file.FlatFileItemReader">
          <property .../>
          <property .../>
      </bean>
   </property>
</bean>

自版本 3.0.4 Spring Batch 提供开箱即用的包装器 class(如 Hansjoerg Wingeier):SynchronizedItemStreamReader<T>