在 KCL 2.x ( Kinesis ) 下使用来自特定分片的记录
Consume records from specific shards under KCL 2.x ( Kinesis )
我在 Kinesis 流中的某些特定分片下有一组记录。
我正在使用 KCL 2.x 消费者从运动中消费记录,但问题是消费者正在从流中可用的所有分片中获取我的记录。
那么有什么方法可以在配置 configBuilder 对象或 KCL 消费者时指定分片或其 ID,以便仅使用来自指定分片的记录。
示例代码:
configsBuilder = new ConfigsBuilder(
applicationName,
streamName,
kinesisAsyncClient,
dynamoDbClient,
cloudWatchClient,
workerID,
new RecordProcessorFactory());
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configBuilder.retrievalConfig()
);
// start the kinesis records consumer.
schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
提前致谢!
KCL 2.x 提供了一个 ShardPrioritization
接口,允许对碎片进行优先级排序或过滤:
/**
* Provides logic to prioritize or filter shards before their execution.
*/
public interface ShardPrioritization {
/**
* Returns new list of shards ordered based on their priority.
* Resulted list may have fewer shards compared to original list
*
* @param original
* list of shards needed to be prioritized
* @return new list that contains only shards that should be processed
*/
List<ShardInfo> prioritize(List<ShardInfo> original);
}
也就是说,您可以提供 ShardPrioritization
实现,它只会留下与您相关的分片。
之后,只需在协调器配置中指定您的优先级:
configsBuilder.coordinatorConfig
.shardPrioritization(new CustomShardsPrioritixation())
我在 Kinesis 流中的某些特定分片下有一组记录。 我正在使用 KCL 2.x 消费者从运动中消费记录,但问题是消费者正在从流中可用的所有分片中获取我的记录。 那么有什么方法可以在配置 configBuilder 对象或 KCL 消费者时指定分片或其 ID,以便仅使用来自指定分片的记录。
示例代码:
configsBuilder = new ConfigsBuilder(
applicationName,
streamName,
kinesisAsyncClient,
dynamoDbClient,
cloudWatchClient,
workerID,
new RecordProcessorFactory());
scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configBuilder.retrievalConfig()
);
// start the kinesis records consumer.
schedulerThread = new Thread(scheduler);
schedulerThread.setDaemon(true);
schedulerThread.start();
提前致谢!
KCL 2.x 提供了一个 ShardPrioritization
接口,允许对碎片进行优先级排序或过滤:
/**
* Provides logic to prioritize or filter shards before their execution.
*/
public interface ShardPrioritization {
/**
* Returns new list of shards ordered based on their priority.
* Resulted list may have fewer shards compared to original list
*
* @param original
* list of shards needed to be prioritized
* @return new list that contains only shards that should be processed
*/
List<ShardInfo> prioritize(List<ShardInfo> original);
}
也就是说,您可以提供 ShardPrioritization
实现,它只会留下与您相关的分片。
之后,只需在协调器配置中指定您的优先级:
configsBuilder.coordinatorConfig
.shardPrioritization(new CustomShardsPrioritixation())