Spring 集成使异步网关等待聚合结果
Spring integration make asynchronous gateway wait for aggregation result
我正在使用 spring 集成来处理文件。我想逐行读取文件,过滤,转换并发送到 kafka。通过异步网关传递文件进行处理。在处理结束时,我添加了聚合器用于两个目的:
- 我想等到处理完整个文件。也就是说,所有正确处理的行都已发送到 kafka,所有导致错误的行都由从错误通道读取的流处理。
- 作为网关的返回值,我想接收文件处理的汇总结果。从文件中读取了多少行,没有错误地处理了多少行等
到目前为止,我已经创建了等待文件标记 START 和 END + 所有已处理行的聚合器。我创建了自定义消息组来计算已处理的行数,还存储 replyChannel
取自文件 START 标记消息和行数,取自文件 END 标记消息。该组还设置了 5s 超时。当组完成时,它会发出消息,其中包含我想要的统计信息和 replyChannel
header 以及从 START 标记获取的值。我的理解是网关等待对此 replyChannel
的响应。这是消息组的代码:
public void add(Message<?> messageToAdd) {
if(messageToAdd.getPayload() instanceof FileMarker) {
FileMarker marker = (FileMarker) messageToAdd.getPayload();
switch (marker.getMark()) {
case START:
replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
break;
case END:
expectedRowCount.set(marker.getLineCount());
break;
default:
throw new IllegalStateException("Unexpected file mark");
}
} else {
ProcessingResult processingResult = messageToAdd.getHeaders()
.get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
assert processingResult != null;
rowCount.incrementAndGet();
switch (processingResult) {
case FILTERED:
filteredCount.incrementAndGet();
break;
case PROCESSED:
processedCount.incrementAndGet();
break;
case PROCESSING_ERROR:
processingErrorCount.incrementAndGet();
break;
case KAFKA_ERROR:
kafkaErrorCount.incrementAndGet();
break;
default:
throw new IllegalStateException("Unrecognized processing result: " + processingResult);
}
}
}
我遇到的问题是我的网关从未收到响应并无限等待。 如何让异步网关等待聚合器发出其消息并接收此消息的负载作为网关处理的结果?
可以在此处找到重现问题的测试 https://github.com/hawk1234/spring-integration-example 提交 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4。当您 运行 时,测试应用程序日志在路径 build/logs/spring-integration-example.log 下可用。当前测试挂起,因为网关从未收到响应。此外,整个流程正在进行中,因此组仅在超时后发布。
I have created custom message group that
这是错误的。
当你已经拥有组中的所有消息(包括FileMarker
)时,你必须执行所有必需的逻辑,标准组。有一个MessageGroupProcessor
,当ReleaseStrategy
returns true
时调用3。因此,此处您拥有群组的所有消息,您可以执行所有必需的统计信息收集以生成所需的回复。
您的应用程序太复杂,无法快速消化。我看到您没有将错误发送给聚合器。那个必须在顶部流的某个地方,而不仅仅是在 FileMarker
子流中。我的意思是最好在单独的顶级流程中有一个聚合器并真正将 FileMarker
直接发送到那个聚合器,但其余部分来自 sendSuccessChannel()
并且也在处理错误之后。
我已经设法解决了这个问题。我必须手动将聚合消息发送到 MessageHeaders.REPLY_CHANNEL
才能执行此操作,我使用了方法 BaseIntegrationFlowDefinition#logAndReply
。提交 eefd5f472891ded39f363ff71ec530ada800f704.
可获得修复以及完成的示例
我正在使用 spring 集成来处理文件。我想逐行读取文件,过滤,转换并发送到 kafka。通过异步网关传递文件进行处理。在处理结束时,我添加了聚合器用于两个目的:
- 我想等到处理完整个文件。也就是说,所有正确处理的行都已发送到 kafka,所有导致错误的行都由从错误通道读取的流处理。
- 作为网关的返回值,我想接收文件处理的汇总结果。从文件中读取了多少行,没有错误地处理了多少行等
到目前为止,我已经创建了等待文件标记 START 和 END + 所有已处理行的聚合器。我创建了自定义消息组来计算已处理的行数,还存储 replyChannel
取自文件 START 标记消息和行数,取自文件 END 标记消息。该组还设置了 5s 超时。当组完成时,它会发出消息,其中包含我想要的统计信息和 replyChannel
header 以及从 START 标记获取的值。我的理解是网关等待对此 replyChannel
的响应。这是消息组的代码:
public void add(Message<?> messageToAdd) {
if(messageToAdd.getPayload() instanceof FileMarker) {
FileMarker marker = (FileMarker) messageToAdd.getPayload();
switch (marker.getMark()) {
case START:
replyChannel = messageToAdd.getHeaders().get(MessageHeaders.REPLY_CHANNEL);
break;
case END:
expectedRowCount.set(marker.getLineCount());
break;
default:
throw new IllegalStateException("Unexpected file mark");
}
} else {
ProcessingResult processingResult = messageToAdd.getHeaders()
.get(ProcessingConstants.PROCESSING_RESULT_HEADER, ProcessingResult.class);
assert processingResult != null;
rowCount.incrementAndGet();
switch (processingResult) {
case FILTERED:
filteredCount.incrementAndGet();
break;
case PROCESSED:
processedCount.incrementAndGet();
break;
case PROCESSING_ERROR:
processingErrorCount.incrementAndGet();
break;
case KAFKA_ERROR:
kafkaErrorCount.incrementAndGet();
break;
default:
throw new IllegalStateException("Unrecognized processing result: " + processingResult);
}
}
}
我遇到的问题是我的网关从未收到响应并无限等待。 如何让异步网关等待聚合器发出其消息并接收此消息的负载作为网关处理的结果?
可以在此处找到重现问题的测试 https://github.com/hawk1234/spring-integration-example 提交 9f121f0729d8076872e6fbdcd7b1b91ca9ea8cb4。当您 运行 时,测试应用程序日志在路径 build/logs/spring-integration-example.log 下可用。当前测试挂起,因为网关从未收到响应。此外,整个流程正在进行中,因此组仅在超时后发布。
I have created custom message group that
这是错误的。
当你已经拥有组中的所有消息(包括FileMarker
)时,你必须执行所有必需的逻辑,标准组。有一个MessageGroupProcessor
,当ReleaseStrategy
returns true
时调用3。因此,此处您拥有群组的所有消息,您可以执行所有必需的统计信息收集以生成所需的回复。
您的应用程序太复杂,无法快速消化。我看到您没有将错误发送给聚合器。那个必须在顶部流的某个地方,而不仅仅是在 FileMarker
子流中。我的意思是最好在单独的顶级流程中有一个聚合器并真正将 FileMarker
直接发送到那个聚合器,但其余部分来自 sendSuccessChannel()
并且也在处理错误之后。
我已经设法解决了这个问题。我必须手动将聚合消息发送到 MessageHeaders.REPLY_CHANNEL
才能执行此操作,我使用了方法 BaseIntegrationFlowDefinition#logAndReply
。提交 eefd5f472891ded39f363ff71ec530ada800f704.