Spring Intgergation aws - KinesisMessageHandler 直接通道

Spring Intgergation aws - KinesisMessageHandler Direct Channel

我将消息发布到运动流的消息处理程序如下

public MessageHandler kinesisMessageHandler(final AmazonKinesisAsync amazonKinesis,
        @Qualifier("successChannel") MessageChannel successChannel,
        @Qualifier("errorChannel") MessageChannel errorChannel) {

    KinesisMessageHandler kinesisMessageHandler = new KinesisMessageHandler(amazonKinesis);
    kinesisMessageHandler.setSync(false);
    kinesisMessageHandler.setOutputChannel(successChannel);
    kinesisMessageHandler.setFailureChannel(errorChannel);
    return kinesisMessageHandler;
}

@Bean(name = "errorChannel")
public MessageChannel errorChannel() {
      return MessageChannels.direct().get();
}

@Bean(name = "successChannel")
public MessageChannel successChannel() {
     return MessageChannels.direct().get();
}

setSync 标志设置为 false,以便处理消息 asynchronously.Also,我创建了单独的 IntegrationFlow 来接收和处理来自成功和错误通道的 Kinesis 响应。

public IntegrationFlow successMessageIntegrationFlow(MessageChannel successChannel,
                MessageChannel inboundKinesisMessageChannel,
                MessageReceiverServiceActivator kinesisMessageReceiverServiceActivator) {
            return IntegrationFlows.from(successChannel).channel(inboundKinesisMessageChannel)
                    .handle(kinesisMessageReceiverServiceActivator, "receiveMessage").get();
}

@Bean
public IntegrationFlow errorMessageIntegrationFlow(MessageChannel errorChannel,
                MessageChannel inboundKinesisErrorChannel,
                MessageReceiverServiceActivator kinesisErrorReceiverServiceActivator
               ) {
            return IntegrationFlows.from(errorChannel).channel(inboundKinesisErrorChannel)
                    .handle(kinesisErrorReceiverServiceActivator, "receiveMessage").get();
}

我想知道您在使用直接通道从 Kinesis 接收成功和错误响应并使用 IntegrationFlow 处理它时是否发现任何问题。据我所知,对于直接通道,生产者在发送期间是一个阻塞者,直到消费者完成其工作并将 returns 管理返回给生产者调用者。此处生产者由 AmazonKinesisAsyncClient 在一组不同的线程池中执行并且生产者不会等待 IntegrationFlow 处理消息是否正确?让我知道如果我需要以不同的方式实现它

您关于阻塞的假设是正确的:控制不会返回到生产线程。因此,如果该 Kinesis 客户端中的线程数量有限,您需要确保尽快释放它们。您可能会考虑将这些回调放在队列通道中。它们无论如何都是异步的,但如果那样的话,它们将不会持有 Kinesis 客户端。

您的流程仍然存在缺陷:.channel(inboundKinesisMessageChannel)。这意味着如果有两个不同的流,中间的同一个通道。如果它是直接的,那么你最终会得到循环分配。我会完全删除它。