AWS Kinesis 增强型扇出 Java 示例
AWS Kinesis Enhanced fan-out Java example
我有一个应用程序使用来自 Kinesis 流的记录并进一步处理它们,但性能很低,所以现在我计划迁移到使用 KCL 2.x 的 Kinesis Enhanced 扇出消费者以改进它的性能。由于增强型扇出的 Aws Kinesis 文档非常混乱,有人可以帮我举例说明如何在我的 Java 应用程序中实现此消费者功能吗?
这是一个非常详细的 KCL 2.x 消费者示例:https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html
最重要的部分是:
SampleRecordProcessor
- 消费者处理逻辑所在的 ShardRecordProcessor 接口的实现。
SampleRecordProcessorFactory
Scheduler
配置:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
上面的重要部分是指定了默认的 retrievalConfig(),它在后台配置增强的 fan-out 消费者。显式方式如下:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(
new FanOutConfig(kinesisClient)
.streamName(streamName)
.applicationName(appName)
)
.maxListShardsRetryAttempts(maxListShardsRetryAttempts)
.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
)
);
我有一个应用程序使用来自 Kinesis 流的记录并进一步处理它们,但性能很低,所以现在我计划迁移到使用 KCL 2.x 的 Kinesis Enhanced 扇出消费者以改进它的性能。由于增强型扇出的 Aws Kinesis 文档非常混乱,有人可以帮我举例说明如何在我的 Java 应用程序中实现此消费者功能吗?
这是一个非常详细的 KCL 2.x 消费者示例:https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-kcl-java.html
最重要的部分是:
SampleRecordProcessor
- 消费者处理逻辑所在的 ShardRecordProcessor 接口的实现。SampleRecordProcessorFactory
Scheduler
配置:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
上面的重要部分是指定了默认的 retrievalConfig(),它在后台配置增强的 fan-out 消费者。显式方式如下:
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
.retrievalSpecificConfig(
new FanOutConfig(kinesisClient)
.streamName(streamName)
.applicationName(appName)
)
.maxListShardsRetryAttempts(maxListShardsRetryAttempts)
.initialPositionInStreamExtended(
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream)
)
);