Spring 批处理 - 使用带有列表列表的 ItemWriter
Spring Batch - Using an ItemWriter with List of Lists
我们的处理器 returns 一个 List<?>
(有效地传递一个 List<List<?>>
)到我们的 ItemWriter
。
现在,我们观察到 JdbcBatchItemWriter
没有被编程来处理 item instanceof List
。我们还观察到处理项目 instanceof List
;我们需要写一个自定义 ItemSqlParameterSourceProvider
。
但可悲的是,它 returns SqlParameterSource
只能处理一个 item
而无法处理 List
。
那么,有人可以帮助我们了解如何处理 JdbcBatchItemWriter
中的列表列表吗?
通常,设计模式是:
Reader -> reads something, returns ReadItem
Processor -> ingests ReadItem, returns ProcessedItem
Writer -> ingests List<ProcessedItem>
如果您的处理器正在返回 List<Object>
,那么您需要您的 Writer 期望 List<List<Object>>
。
您可以通过将 JdbcBatchItemWriter
包装为 ItemWriter 中的委托来实现此目的,如下所示:
public class ListUnpackingItemWriter<T> implements ItemWriter<List<T>>, ItemStream, InitializingBean {
private ItemWriter<T> delegate;
@Override
public void write(final List<? extends List<T>> lists) throws Exception {
final List<T> consolidatedList = new ArrayList<>();
for (final List<T> list : lists) {
consolidatedList.addAll(list);
}
delegate.write(consolidatedList);
}
@Override
public void afterPropertiesSet() {
Assert.notNull(delegate, "You must set a delegate!");
}
@Override
public void open(ExecutionContext executionContext) {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}
@Override
public void close() {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}
public void setDelegate(ItemWriter<T> delegate) {
this.delegate = delegate;
}
}
public class ListUnpackingItemWriter<T> implements FlatFileItemWriter<List<T>>, ItemStream, InitializingBean {
@Override
public void afterPropertiesSet() {
setLineAggregator(item -> String.join("\n", item.stream().map(T::toString).collect(Collectors.toList())));
}
}
刚刚在上述解决方案中添加了自定义行聚合器,这有助于使用 FlatFileItemWriter<List<T>>
将内容写入文件。您可以将 T
替换为实际的 class 名称,以避免在调用 toString()
方法时出现编译错误。
我们的处理器 returns 一个 List<?>
(有效地传递一个 List<List<?>>
)到我们的 ItemWriter
。
现在,我们观察到 JdbcBatchItemWriter
没有被编程来处理 item instanceof List
。我们还观察到处理项目 instanceof List
;我们需要写一个自定义 ItemSqlParameterSourceProvider
。
但可悲的是,它 returns SqlParameterSource
只能处理一个 item
而无法处理 List
。
那么,有人可以帮助我们了解如何处理 JdbcBatchItemWriter
中的列表列表吗?
通常,设计模式是:
Reader -> reads something, returns ReadItem
Processor -> ingests ReadItem, returns ProcessedItem
Writer -> ingests List<ProcessedItem>
如果您的处理器正在返回 List<Object>
,那么您需要您的 Writer 期望 List<List<Object>>
。
您可以通过将 JdbcBatchItemWriter
包装为 ItemWriter 中的委托来实现此目的,如下所示:
public class ListUnpackingItemWriter<T> implements ItemWriter<List<T>>, ItemStream, InitializingBean {
private ItemWriter<T> delegate;
@Override
public void write(final List<? extends List<T>> lists) throws Exception {
final List<T> consolidatedList = new ArrayList<>();
for (final List<T> list : lists) {
consolidatedList.addAll(list);
}
delegate.write(consolidatedList);
}
@Override
public void afterPropertiesSet() {
Assert.notNull(delegate, "You must set a delegate!");
}
@Override
public void open(ExecutionContext executionContext) {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).open(executionContext);
}
}
@Override
public void update(ExecutionContext executionContext) {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).update(executionContext);
}
}
@Override
public void close() {
if (delegate instanceof ItemStream) {
((ItemStream) delegate).close();
}
}
public void setDelegate(ItemWriter<T> delegate) {
this.delegate = delegate;
}
}
public class ListUnpackingItemWriter<T> implements FlatFileItemWriter<List<T>>, ItemStream, InitializingBean {
@Override
public void afterPropertiesSet() {
setLineAggregator(item -> String.join("\n", item.stream().map(T::toString).collect(Collectors.toList())));
}
}
刚刚在上述解决方案中添加了自定义行聚合器,这有助于使用 FlatFileItemWriter<List<T>>
将内容写入文件。您可以将 T
替换为实际的 class 名称,以避免在调用 toString()
方法时出现编译错误。