AWS Kinesis 增强型扇出 Java 示例

AWS Kinesis Enhanced fan-out Java example

我有一个应用程序使用来自 Kinesis 流的记录并进一步处理它们,但性能很低,所以现在我计划迁移到使用 KCL 2.x 的 Kinesis Enhanced 扇出消费者以改进它的性能。由于增强型扇出的 Aws Kinesis 文档非常混乱,有人可以帮我举例说明如何在我的 Java 应用程序中实现此消费者功能吗?

这是一个非常详细的 KCL 2.x 消费者示例:https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html

最重要的部分是:

  • SampleRecordProcessor - 消费者处理逻辑所在的 ShardRecordProcessor 接口的实现。
  • SampleRecordProcessorFactory
  • Scheduler 配置:
Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );

上面的重要部分是指定了默认的 retrievalConfig(),它在后台配置增强的 fan-out 消费者。显式方式如下:

Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                       .retrievalSpecificConfig(
                           new FanOutConfig(kinesisClient)
                             .streamName(streamName)
                             .applicationName(appName)
                           )
                       .maxListShardsRetryAttempts(maxListShardsRetryAttempts)
                       .initialPositionInStreamExtended(
       InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
                 )
        );