Spring 集成文件拆分器内存使用情况
Spring Integration File Splitter Memory Usage
split()
对 File object
的操作是否缓冲并执行每一行,或者所有行是否立即加载到内存中?这是为了了解如果文件恰好包含 100,000+ 行时的内存使用情况。
transformer
到 return void
公平吗?用法是从有效负载 & headers 计算一些逻辑,然后将计算值添加到 headers。有没有更好的方法?
谢谢
更新:
return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
.split() // process file by file
.log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
.enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
.split(fileSplitterSpec()) // split file lines
.filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
.log(INFO, message -> "Payload: " + message.getPayload())
.transform(barTransformer)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
.aggregate()
.log(INFO, message -> "All Files Processed")
// .handle(null)
.get();
是的,这确实是 FileSplitter
的目的。它的内部逻辑基于 FileIterator
,它逐行读取并将其发送到分离器输出通道。
不行,变压器不能returnvoid
。这不是它的目的。听起来 enrichHeaders()
更适合你。消息是不可变的,您只是不能修改当前消息以获得可能的进一步逻辑。您使用新数据(或 header)构建一条新消息,并将其作为回复发送到流程下游。
split()
对File object
的操作是否缓冲并执行每一行,或者所有行是否立即加载到内存中?这是为了了解如果文件恰好包含 100,000+ 行时的内存使用情况。transformer
到 returnvoid
公平吗?用法是从有效负载 & headers 计算一些逻辑,然后将计算值添加到 headers。有没有更好的方法?
谢谢
更新:
return IntegrationFlows.from(fileReadingMessageSource(), p -> p.poller(pollerSpec()))
.enrichHeaders(Collections.singletonMap(ERROR_CHANNEL, appErrorChannel))
.split() // process file by file
.log(INFO, message -> "Started File: " + message.getHeaders().get("file_name"))
.enrichHeaders(h -> h.headerFunction("foo", m -> integrationUtil.constructFoo())) // fooobject
.split(fileSplitterSpec()) // split file lines
.filter(payload -> !(payload instanceof FileSplitter.FileMarker), e -> e.discardChannel("aggregatorChannel"))
.log(INFO, message -> "Payload: " + message.getPayload())
.transform(barTransformer)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.log(INFO, message -> "Completed File: " + message.getHeaders().get("file_name"))
.aggregate()
.log(INFO, message -> "All Files Processed")
// .handle(null)
.get();
是的,这确实是 FileSplitter
的目的。它的内部逻辑基于 FileIterator
,它逐行读取并将其发送到分离器输出通道。
不行,变压器不能returnvoid
。这不是它的目的。听起来 enrichHeaders()
更适合你。消息是不可变的,您只是不能修改当前消息以获得可能的进一步逻辑。您使用新数据(或 header)构建一条新消息,并将其作为回复发送到流程下游。