在 KCL 2.x ( Kinesis ) 下使用来自特定分片的记录

Consume records from specific shards under KCL 2.x ( Kinesis )

我在 Kinesis 流中的某些特定分片下有一组记录。 我正在使用 KCL 2.x 消费者从运动中消费记录,但问题是消费者正在从流中可用的所有分片中获取我的记录。 那么有什么方法可以在配置 configBuilder 对象或 KCL 消费者时指定分片或其 ID,以便仅使用来自指定分片的记录。

示例代码:

configsBuilder = new ConfigsBuilder(
        applicationName,
        streamName,
        kinesisAsyncClient,
        dynamoDbClient,
        cloudWatchClient,
        workerID,
        new RecordProcessorFactory());

scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configBuilder.retrievalConfig()
    );

    // start the kinesis records consumer.
    schedulerThread = new Thread(scheduler);
    schedulerThread.setDaemon(true);
    schedulerThread.start();

提前致谢!

KCL 2.x 提供了一个 ShardPrioritization 接口,允许对碎片进行优先级排序或过滤:

/**
 * Provides logic to prioritize or filter shards before their execution.
 */
public interface ShardPrioritization {

    /**
     * Returns new list of shards ordered based on their priority.
     * Resulted list may have fewer shards compared to original list
     * 
     * @param original
     *            list of shards needed to be prioritized
     * @return new list that contains only shards that should be processed
     */
    List<ShardInfo> prioritize(List<ShardInfo> original);
}

也就是说,您可以提供 ShardPrioritization 实现,它只会留下与您相关的分片。

之后,只需在协调器配置中指定您的优先级:

configsBuilder.coordinatorConfig
          .shardPrioritization(new CustomShardsPrioritixation())