多行 AggregateItemReader 未按照 spring-batch-samples 中的建议工作
Multiline AggregateItemReader not working as suggested in spring-batch-samples
我正在尝试在 https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline
的 spring-batch-sample 之后使用多行 IteamReader
我 运行 编译错误如下 -
我确定有一些与泛型相关的东西,因为它正在寻找 class 实现 ItemReader 但 AggregateItemReader 实现 ItemReader。
public class AggregateItemReader<T> implements ItemReader<List<T>> {
你可以在这里找到我的代码 - https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java
更新:
我抑制了泛型并更新了 AggregateItemReader 如下,以便调用 ItemStreamReader open() 方法。
public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);
private ItemStreamReader<AggregateItem<T>> itemReader;
我注意到 ItemWriter 正在编写记录列表而不是每行记录
[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]
并且当我尝试添加处理器时,它抱怨处理器无法将列表转换为贸易对象。
@Bean
public ItemProcessor<Trade, Trade> processor() {
return new ItemProcessor<Trade, Trade>() {
@Override
public Trade process(Trade item) throws Exception {
item.setProcessed(true);
return item;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
AggregateItemReader reader,
ItemProcessor processor,
FlatFileItemWriter writer,
StepItemReadListener itemReadListener) {
return stepBuilderFactory.get("multiLineStep")
.chunk(1)
.reader(reader)
.writer(writer)
.processor(processor)
.build();
}
错误:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
at my.demo.batch.BatchConfiguration.process(BatchConfiguration.java:1) ~[main/:na]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]
帮助:
- 我们如何在不抑制泛型的情况下让它发挥作用?
- 如何确保 ItemReader returns 以与块处理相同的方式列出,以便 ItemProcessor 和 ItemWriter 照常工作?
- 是否可以在不扩展 SimpleStepBuilder 和 SimpleChunkProvider 的情况下这样做?
您需要在要在批次级别处理的项目类型方面保持一致。根据您的步骤定义,它是 Trade
。通过在步骤生成器上调用 <Trade, Trade>chunk(1)
,您声明您的批处理应该读取块大小为 1 的 Trade
类型的项目(即一次一个)并将这些传递给编写器以获取项目输入 Trade
。在这种情况下,您需要提供类型为 ItemReader<Trade>
的 reader、类型为 ItemWriter<Trade>
的编写器和可选的类型为 ItemProcessor<Trade, Trade>
.
的处理器
问题是您的 reader 是 ItemReader<List<Trade>>
类型,即它不会在每次调用其 read
方法时产生 Trade
,而是 交易列表。
如果你想使用 AggregateItemReader
,你需要将它包装到一个自定义的 reader 中,作为适配器工作,实际上 returns Trade
项目而不是 List<Trade>
.
例如,自定义 read
方法可能如下所示:
public Trade read() throws Exception {
if (queue.isEmpty()) {
List<Trade> trades = aggregateItemReader.read();
if (trades != null) {
queue.addAll(trades);
}
}
return queue.poll();
}
queue
初始化为
private Deque<Trade> queue = new ArrayDeque<>();
我正在尝试在 https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#multiline
的 spring-batch-sample 之后使用多行 IteamReader我 运行 编译错误如下 -
我确定有一些与泛型相关的东西,因为它正在寻找 class 实现 ItemReader 但 AggregateItemReader 实现 ItemReader。
public class AggregateItemReader<T> implements ItemReader<List<T>> {
你可以在这里找到我的代码 - https://github.com/arpit9mittal/spring-batch-demo/blob/master/src/main/java/my/demo/batch/BatchConfiguration.java
更新:
我抑制了泛型并更新了 AggregateItemReader 如下,以便调用 ItemStreamReader open() 方法。
public class AggregateItemReader<T> implements ItemStreamReader<List<T>> {
private static final Log LOG = LogFactory.getLog(AggregateItemReader.class);
private ItemStreamReader<AggregateItem<T>> itemReader;
我注意到 ItemWriter 正在编写记录列表而不是每行记录
[Trade: [isin=UK21341EAH45,quantity=978,price=98.34,customer=customer1], Trade: [isin=UK21341EAH46,quantity=112,price=18.12,customer=customer2]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer3], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer4], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer5]]
[Trade: [isin=UK21341EAH47,quantity=245,price=12.78,customer=customer6], Trade: [isin=UK21341EAH48,quantity=108,price=9.25,customer=customer7], Trade: [isin=UK21341EAH49,quantity=854,price=23.39,customer=customer8]]
并且当我尝试添加处理器时,它抱怨处理器无法将列表转换为贸易对象。
@Bean
public ItemProcessor<Trade, Trade> processor() {
return new ItemProcessor<Trade, Trade>() {
@Override
public Trade process(Trade item) throws Exception {
item.setProcessed(true);
return item;
}
};
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public Step multilineStep(
AggregateItemReader reader,
ItemProcessor processor,
FlatFileItemWriter writer,
StepItemReadListener itemReadListener) {
return stepBuilderFactory.get("multiLineStep")
.chunk(1)
.reader(reader)
.writer(writer)
.processor(processor)
.build();
}
错误:
java.lang.ClassCastException: java.util.ArrayList cannot be cast to my.demo.batch.multiline.Trade
at my.demo.batch.BatchConfiguration.process(BatchConfiguration.java:1) ~[main/:na]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:134) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:319) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:210) ~[spring-batch-core-4.3.3.jar:4.3.3]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77) ~[spring-batch-core-4.3.3.jar:4.3.3]
帮助:
- 我们如何在不抑制泛型的情况下让它发挥作用?
- 如何确保 ItemReader returns 以与块处理相同的方式列出,以便 ItemProcessor 和 ItemWriter 照常工作?
- 是否可以在不扩展 SimpleStepBuilder 和 SimpleChunkProvider 的情况下这样做?
您需要在要在批次级别处理的项目类型方面保持一致。根据您的步骤定义,它是 Trade
。通过在步骤生成器上调用 <Trade, Trade>chunk(1)
,您声明您的批处理应该读取块大小为 1 的 Trade
类型的项目(即一次一个)并将这些传递给编写器以获取项目输入 Trade
。在这种情况下,您需要提供类型为 ItemReader<Trade>
的 reader、类型为 ItemWriter<Trade>
的编写器和可选的类型为 ItemProcessor<Trade, Trade>
.
问题是您的 reader 是 ItemReader<List<Trade>>
类型,即它不会在每次调用其 read
方法时产生 Trade
,而是 交易列表。
如果你想使用 AggregateItemReader
,你需要将它包装到一个自定义的 reader 中,作为适配器工作,实际上 returns Trade
项目而不是 List<Trade>
.
例如,自定义 read
方法可能如下所示:
public Trade read() throws Exception {
if (queue.isEmpty()) {
List<Trade> trades = aggregateItemReader.read();
if (trades != null) {
queue.addAll(trades);
}
}
return queue.poll();
}
queue
初始化为
private Deque<Trade> queue = new ArrayDeque<>();