多行 AggregateItemReader 未按照 spring-batch-samples 中的建议工作

Multiline AggregateItemReader not working as suggested in spring-batch-samples

我正在尝试在 https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline

的 spring-batch-sample 之后使用多行 IteamReader

我 运行 编译错误如下 -

我确定有一些与泛型相关的东西,因为它正在寻找 class 实现 ItemReader 但 AggregateItemReader 实现 ItemReader

public class AggregateItemReader<T> implements ItemReader<List<T>> {

你可以在这里找到我的代码 - https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java

更新:

我抑制了泛型并更新了 AggregateItemReader 如下,以便调用 ItemStreamReader open() 方法。

public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);

private ItemStreamReader<AggregateItem<T>> itemReader;

我注意到 ItemWriter 正在编写记录列表而不是每行记录

[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]

并且当我尝试添加处理器时,它抱怨处理器无法将列表转换为贸易对象。

@Bean
public ItemProcessor<Trade, Trade> processor() {
    return new ItemProcessor<Trade, Trade>() {

        @Override
        public Trade process(Trade item) throws Exception {
            item.setProcessed(true);
            return item;
        }
    };
}

@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
        AggregateItemReader reader, 
        ItemProcessor processor,
        FlatFileItemWriter writer,
        StepItemReadListener itemReadListener) {
    return stepBuilderFactory.get("multiLineStep")
            .chunk(1)
            .reader(reader)
            .writer(writer)
            .processor(processor)
            .build();
}

错误:

java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
    at my.demo.batch.BatchConfiguration.process(BatchConfiguration.java:1) ~[main/:na]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]

帮助:

  1. 我们如何在不抑制泛型的情况下让它发挥作用?
  2. 如何确保 ItemReader returns 以与块处理相同的方式列出,以便 ItemProcessor 和 ItemWriter 照常工作?
  3. 是否可以在不扩展 SimpleStepBuilder 和 SimpleChunkProvider 的情况下这样做?

您需要在要在批次级别处理的项目类型方面保持一致。根据您的步骤定义,它是 Trade。通过在步骤生成器上调用 <Trade, Trade>chunk(1),您声明您的批处理应该读取块大小为 1 的 Trade 类型的项目(即一次一个)并将这些传递给编写器以获取项目输入 Trade。在这种情况下,您需要提供类型为 ItemReader<Trade> 的 reader、类型为 ItemWriter<Trade> 的编写器和可选的类型为 ItemProcessor<Trade, Trade>.

的处理器

问题是您的 reader 是 ItemReader<List<Trade>> 类型,即它不会在每次调用其 read 方法时产生 Trade,而是 交易列表

如果你想使用 AggregateItemReader,你需要将它包装到一个自定义的 reader 中,作为适配器工作,实际上 returns Trade 项目而不是 List<Trade>.

例如,自定义 read 方法可能如下所示:

public Trade read() throws Exception {
    if (queue.isEmpty()) {
        List<Trade> trades = aggregateItemReader.read();
        if (trades != null) {
            queue.addAll(trades);
        }
    }
    return queue.poll();
}

queue初始化为

private Deque<Trade> queue = new ArrayDeque<>();