Spring 集成 aws Kinesis、消息聚合器、发布策略

Spring Integration aws Kinesis , message aggregator, Release Strategy

这是

的后续问题

我有以下配置。我注意到,当我第一次将消息发送到名为 kinesisSendChannel 的输入通道时,将调用聚合器和发布策略并将消息发送到 Kinesis Streams。我把调试断点放在不同的地方,可以验证这种行为。但是,当我再次将消息发布到同一输入通道时,发布策略和出站处理器不会被调用,消息也不会发送到 Kinesis。我不确定为什么聚合器流只在第一次被调用而不是为后续消息调用。出于测试目的,将 TimeoutCountSequenceSizeReleaseStrategy 设置为计数为 1,时间为 60 秒。没有使用特定的 MessageStore。你能帮忙找出问题所在吗?

@Bean(name = "kinesisSendChannel")
public MessageChannel kinesisSendChannel() {
    return MessageChannels.direct().get();
}

@Bean(name = "resultChannel")
public MessageChannel resultChannel() {
    return MessageChannels.direct().get();
}

@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler aggregator(TestMessageProcessor messageProcessor,
        MessageChannel resultChannel,
        TimeoutCountSequenceSizeReleaseStrategy timeoutCountSequenceSizeReleaseStrategy) {
    AggregatingMessageHandler handler = new AggregatingMessageHandler(messageProcessor);
    handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
    handler.setReleaseStrategy(timeoutCountSequenceSizeReleaseStrategy);
    handler.setOutputProcessor(messageProcessor);
    handler.setOutputChannel(resultChannel);
    return handler;

}

@Bean
@ServiceActivator(inputChannel = "resultChannel")
public MessageHandler kinesisMessageHandler1(@Qualifier("successChannel") MessageChannel successChannel,
        @Qualifier("errorChannel") MessageChannel errorChannel, final AmazonKinesisAsync amazonKinesis) {
    KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
    kinesisMessageHandler.setSync(true);
    kinesisMessageHandler.setOutputChannel(successChannel);
    kinesisMessageHandler.setFailureChannel(errorChannel);

    return kinesisMessageHandler;
}



public class TestMessageProcessor extends AbstractAggregatingMessageGroupProcessor {

    @Override
    protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
        final PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("test-stream");

        final List<PutRecordsRequestEntry> putRecordsRequestEntry = group.getMessages().stream()
                .map(message -> (PutRecordsRequestEntry) message.getPayload()).collect(Collectors.toList());

        putRecordsRequest.withRecords(putRecordsRequestEntry);
        return putRecordsRequestEntry;

    }

}

我认为问题出在这里 handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));。您的所有消息都带有相同的 foo header。因此,它们都形成同一个消息组。只要你释放组,不删除它,所有新消息都会被丢弃。

请修改聚合器文档以熟悉所有可能的行为:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator