Spring 集成 aws Kinesis、消息聚合器、发布策略
Spring Integration aws Kinesis , message aggregator, Release Strategy
这是
的后续问题
我有以下配置。我注意到,当我第一次将消息发送到名为 kinesisSendChannel 的输入通道时,将调用聚合器和发布策略并将消息发送到 Kinesis Streams。我把调试断点放在不同的地方,可以验证这种行为。但是,当我再次将消息发布到同一输入通道时,发布策略和出站处理器不会被调用,消息也不会发送到 Kinesis。我不确定为什么聚合器流只在第一次被调用而不是为后续消息调用。出于测试目的,将 TimeoutCountSequenceSizeReleaseStrategy 设置为计数为 1,时间为 60 秒。没有使用特定的 MessageStore。你能帮忙找出问题所在吗?
@Bean(name = "kinesisSendChannel")
public MessageChannel kinesisSendChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "resultChannel")
public MessageChannel resultChannel() {
return MessageChannels.direct().get();
}
@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler aggregator(TestMessageProcessor messageProcessor,
MessageChannel resultChannel,
TimeoutCountSequenceSizeReleaseStrategy timeoutCountSequenceSizeReleaseStrategy) {
AggregatingMessageHandler handler = new AggregatingMessageHandler(messageProcessor);
handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
handler.setReleaseStrategy(timeoutCountSequenceSizeReleaseStrategy);
handler.setOutputProcessor(messageProcessor);
handler.setOutputChannel(resultChannel);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "resultChannel")
public MessageHandler kinesisMessageHandler1(@Qualifier("successChannel") MessageChannel successChannel,
@Qualifier("errorChannel") MessageChannel errorChannel, final AmazonKinesisAsync amazonKinesis) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
kinesisMessageHandler.setSync(true);
kinesisMessageHandler.setOutputChannel(successChannel);
kinesisMessageHandler.setFailureChannel(errorChannel);
return kinesisMessageHandler;
}
public class TestMessageProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
final PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("test-stream");
final List<PutRecordsRequestEntry> putRecordsRequestEntry = group.getMessages().stream()
.map(message -> (PutRecordsRequestEntry) message.getPayload()).collect(Collectors.toList());
putRecordsRequest.withRecords(putRecordsRequestEntry);
return putRecordsRequestEntry;
}
}
我认为问题出在这里 handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
。您的所有消息都带有相同的 foo
header。因此,它们都形成同一个消息组。只要你释放组,不删除它,所有新消息都会被丢弃。
请修改聚合器文档以熟悉所有可能的行为:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
这是
我有以下配置。我注意到,当我第一次将消息发送到名为 kinesisSendChannel 的输入通道时,将调用聚合器和发布策略并将消息发送到 Kinesis Streams。我把调试断点放在不同的地方,可以验证这种行为。但是,当我再次将消息发布到同一输入通道时,发布策略和出站处理器不会被调用,消息也不会发送到 Kinesis。我不确定为什么聚合器流只在第一次被调用而不是为后续消息调用。出于测试目的,将 TimeoutCountSequenceSizeReleaseStrategy 设置为计数为 1,时间为 60 秒。没有使用特定的 MessageStore。你能帮忙找出问题所在吗?
@Bean(name = "kinesisSendChannel")
public MessageChannel kinesisSendChannel() {
return MessageChannels.direct().get();
}
@Bean(name = "resultChannel")
public MessageChannel resultChannel() {
return MessageChannels.direct().get();
}
@Bean
@ServiceActivator(inputChannel = "kinesisSendChannel")
public MessageHandler aggregator(TestMessageProcessor messageProcessor,
MessageChannel resultChannel,
TimeoutCountSequenceSizeReleaseStrategy timeoutCountSequenceSizeReleaseStrategy) {
AggregatingMessageHandler handler = new AggregatingMessageHandler(messageProcessor);
handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
handler.setReleaseStrategy(timeoutCountSequenceSizeReleaseStrategy);
handler.setOutputProcessor(messageProcessor);
handler.setOutputChannel(resultChannel);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "resultChannel")
public MessageHandler kinesisMessageHandler1(@Qualifier("successChannel") MessageChannel successChannel,
@Qualifier("errorChannel") MessageChannel errorChannel, final AmazonKinesisAsync amazonKinesis) {
KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
kinesisMessageHandler.setSync(true);
kinesisMessageHandler.setOutputChannel(successChannel);
kinesisMessageHandler.setFailureChannel(errorChannel);
return kinesisMessageHandler;
}
public class TestMessageProcessor extends AbstractAggregatingMessageGroupProcessor {
@Override
protected Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders) {
final PutRecordsRequest putRecordsRequest = new PutRecordsRequest().withStreamName("test-stream");
final List<PutRecordsRequestEntry> putRecordsRequestEntry = group.getMessages().stream()
.map(message -> (PutRecordsRequestEntry) message.getPayload()).collect(Collectors.toList());
putRecordsRequest.withRecords(putRecordsRequestEntry);
return putRecordsRequestEntry;
}
}
我认为问题出在这里 handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("headers['foo']"));
。您的所有消息都带有相同的 foo
header。因此,它们都形成同一个消息组。只要你释放组,不删除它,所有新消息都会被丢弃。
请修改聚合器文档以熟悉所有可能的行为:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator