使用 spring DSL 构建 spring 集成发布策略
Build spring integration release strategy using spring DSL
我是 Spring 集成的新手。我正在尝试使用文件拆分器从文件中拆分消息,然后使用 .aggregate() 构建单个消息并发送到输出通道。
我将标记设置为 true,因此现在默认情况下 apply-sequence 为 false。
我已使用 enrichHeaders 将 correlationId 设置为常量“1”。我无法设置 realease 策略,因为我没有保留序列结束。这是我的代码的样子。
IntegrationFlows
.from(s -> s.file(new File(fileDir))
.filter(getFileFilter(fileName)),
e -> e.poller(poller))
.split(Files.splitter(true, true)
.charset(StandardCharsets.US_ASCII),
e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));
IntegrationFlow integrationFlow = integrationFlowBuilder
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
@Bean
public IntegrationFlow itemExcludes() {
return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
.aggregate(aggregator -> aggregator
.outputProcessor(group -> group.getMessages()
.stream()
.map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
.collect(Collectors.joining(","))))
.transform(Transformers.toJson())
.channel(customSource.itemExclude());
}
@Bean
public IntegrationFlow itemExcludeMarkers() {
return flow -> flow
.log(LoggingHandler.Level.INFO)
.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
.<FileHandler>handle(new FileHandler(configProps))
.channel(NULL_CHANNEL);
}
感谢任何帮助。
使用自定义发布策略在最后一条消息中查找 END 标记,也许使用自定义输出处理器从集合中删除标记。
我会在 splitter
之前将 correlationId
的 header enricher 移到 splitter
之前:
.enrichHeaders(h -> h
.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID,
m -> m.getHeaders().getId()))
常量correlationId
在multi-threaded环境下绝对不好:不同的线程拆分不同的文件,将不同的行发送到同一个聚合器。因此,使用 "1"
作为关联键,您将始终拥有一组来聚合和发布。默认的 sequence 行为是将原始消息 id
填充到 correlationId
。由于您不会依赖 FileSplitter
中的 applySequence
,因此我建议使用简单的解决方案来模拟该行为。
正如加里在他的回答中指出的那样,您需要考虑自定义 ReleaseStrategy
并将 FileSplitter.FileMarker
发送到聚合器。 FileSplitter.FileMarker.END
有lineCount
属性可以和MessageGroup.size
比较来决定我们好放团。 MessageGroupProcessor
确实必须在构建输出结果期间过滤 FileSplitter.FileMarker
消息。
我是 Spring 集成的新手。我正在尝试使用文件拆分器从文件中拆分消息,然后使用 .aggregate() 构建单个消息并发送到输出通道。 我将标记设置为 true,因此现在默认情况下 apply-sequence 为 false。 我已使用 enrichHeaders 将 correlationId 设置为常量“1”。我无法设置 realease 策略,因为我没有保留序列结束。这是我的代码的样子。
IntegrationFlows
.from(s -> s.file(new File(fileDir))
.filter(getFileFilter(fileName)),
e -> e.poller(poller))
.split(Files.splitter(true, true)
.charset(StandardCharsets.US_ASCII),
e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));
IntegrationFlow integrationFlow = integrationFlowBuilder
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
.channelMapping(String.class, "lines.input"))
.get();
@Bean
public IntegrationFlow itemExcludes() {
return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
.aggregate(aggregator -> aggregator
.outputProcessor(group -> group.getMessages()
.stream()
.map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
.collect(Collectors.joining(","))))
.transform(Transformers.toJson())
.channel(customSource.itemExclude());
}
@Bean
public IntegrationFlow itemExcludeMarkers() {
return flow -> flow
.log(LoggingHandler.Level.INFO)
.<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
.<FileHandler>handle(new FileHandler(configProps))
.channel(NULL_CHANNEL);
}
感谢任何帮助。
使用自定义发布策略在最后一条消息中查找 END 标记,也许使用自定义输出处理器从集合中删除标记。
我会在 splitter
之前将 correlationId
的 header enricher 移到 splitter
之前:
.enrichHeaders(h -> h
.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID,
m -> m.getHeaders().getId()))
常量correlationId
在multi-threaded环境下绝对不好:不同的线程拆分不同的文件,将不同的行发送到同一个聚合器。因此,使用 "1"
作为关联键,您将始终拥有一组来聚合和发布。默认的 sequence 行为是将原始消息 id
填充到 correlationId
。由于您不会依赖 FileSplitter
中的 applySequence
,因此我建议使用简单的解决方案来模拟该行为。
正如加里在他的回答中指出的那样,您需要考虑自定义 ReleaseStrategy
并将 FileSplitter.FileMarker
发送到聚合器。 FileSplitter.FileMarker.END
有lineCount
属性可以和MessageGroup.size
比较来决定我们好放团。 MessageGroupProcessor
确实必须在构建输出结果期间过滤 FileSplitter.FileMarker
消息。