使用 Kinesis Client Library ( KCL 2.x ) 的 Kinesis Stream 的多个消费者
Multiple consumers to the Kinesis Stream using Kinesis Client Library ( KCL 2.x )
我有一个应用程序使用 KCL 2.x 使用来自 Kinesis 的记录,存在于不同流分片中的数据采用不同的格式,我想使用具有不同配置的不同 KCL 消费者单独处理它们。例如,如果我在 Kinesis 流中有 3 个分片,我将为不同的分片生成 3 个具有不同配置的不同消费者,即每个分片 1 个消费者。
我在创建多个消费者时面临的问题是,如果我通过 java 代码配置 3 个不同的消费者,那么其中任何一个都会占用所有分片的租约锁,而其他消费者无法获得该租约。
例如
总碎片:3,
配置的消费者总数:3,
Application logs :
[2020-07-13 18:55:50,549] (LeaseCoordinator-0000) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:50,549] (LeaseCoordinator-0002) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:50,554] (Thread-22) INFO Initialization complete. Starting worker loop. (software.amazon.kinesis.coordinator.Scheduler:238)
[2020-07-13 18:55:50,842] (LeaseCoordinator-0004) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:51,452] (LeaseCoordinator-0000) INFO Worker application-test-stream successfully took 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:203)
[2020-07-13 18:55:51,457] (LeaseCoordinator-0002) INFO Worker application-test-stream failed to take 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:208)
[2020-07-13 18:55:51,757] (LeaseCoordinator-0004) INFO Worker application-test-stream failed to take 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:208)
我如何配置我的 KCL 消费者,以便消费者只会租用分配给他的分片。
通常情况下,Kinesis 消费者是流级消费者 - 例如,每个消费者都消费流中的所有分片。在您的情况下,您可能 ignore/skip 记录处理器中您不关心的记录,具体取决于分片。没有开箱即用的配置选项只使用特定的分片。
我有一个应用程序使用 KCL 2.x 使用来自 Kinesis 的记录,存在于不同流分片中的数据采用不同的格式,我想使用具有不同配置的不同 KCL 消费者单独处理它们。例如,如果我在 Kinesis 流中有 3 个分片,我将为不同的分片生成 3 个具有不同配置的不同消费者,即每个分片 1 个消费者。 我在创建多个消费者时面临的问题是,如果我通过 java 代码配置 3 个不同的消费者,那么其中任何一个都会占用所有分片的租约锁,而其他消费者无法获得该租约。 例如 总碎片:3, 配置的消费者总数:3,
Application logs :
[2020-07-13 18:55:50,549] (LeaseCoordinator-0000) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:50,549] (LeaseCoordinator-0002) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:50,554] (Thread-22) INFO Initialization complete. Starting worker loop. (software.amazon.kinesis.coordinator.Scheduler:238)
[2020-07-13 18:55:50,842] (LeaseCoordinator-0004) INFO Worker application-test-stream saw 3 total leases, 3 available leases, 1 workers. Target is 3 leases, I have 0 leases, I will take 3 leases (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:397)
[2020-07-13 18:55:51,452] (LeaseCoordinator-0000) INFO Worker application-test-stream successfully took 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:203)
[2020-07-13 18:55:51,457] (LeaseCoordinator-0002) INFO Worker application-test-stream failed to take 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:208)
[2020-07-13 18:55:51,757] (LeaseCoordinator-0004) INFO Worker application-test-stream failed to take 3 leases: shardId-000000000002, shardId-000000000001, shardId-000000000000 (software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseTaker:208)
我如何配置我的 KCL 消费者,以便消费者只会租用分配给他的分片。
通常情况下,Kinesis 消费者是流级消费者 - 例如,每个消费者都消费流中的所有分片。在您的情况下,您可能 ignore/skip 记录处理器中您不关心的记录,具体取决于分片。没有开箱即用的配置选项只使用特定的分片。