Spring Cloud Stream Kinesis Binder - 并发

Spring Cloud Stream Kinesis Binder - Concurrency

我用以下组件构建了一个 spring boot kinesis 消费者:

我使用来自具有 1 个分片的运动流的事件。此 spring 启动消费者应用程序在 Pivotal Cloud Foundry Platform.

中是 运行

在发布此问题之前,我在本地(使用 kinesalite)和 PCF(使用运动流)中尝试了该场景。能否请您确认我的理解是否正确?我浏览了 spring 云流文档 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ and https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc)。尽管文档详尽无遗,但并未详细解释并发性和高可用性。

假设我有 3 个消费者实例部署到 PCF(通过在 cf 推送期间使用的 manifest.yml 文件中将 instances 属性设置为 3) .

所有 3 个实例都具有以下属性:

spring.cloud.stream.bindings..consumer.concurrency=5

spring.cloud.stream.bindings..group=我的消费者组

spring.cloud.stream.kinesis.binder.checkpoint.table=我的元数据-dynamodb-table

spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table

假设生产者按此顺序将事件发送到 kinesis

事件 5(流中的最新事件)- 事件 4 - 事件 3 - 事件 2 - 事件 1(流中的第一个事件)

对于这样的配置,我在下面说明了我的理解。你能确认这是否正确吗?

  1. 在给定的时间点只有一个消费者实例处于活动状态,它将处理发送到运动流的所有事件(因为流只有一个分片)。只有当主实例关闭时,其他 2 个实例之一才会接管控制权。此配置是为了确保高可用性并保持消息的顺序。
  2. 由于 PCF 的 manifest.yml 设置了实例数,因此我不必担心设置 spring.cloud.stream.instanceCount 或 spring.cloud.stream.bindings..consumer.instanceCount 属性。
  3. 当 spring 引导消费者为 started/launched 时,
  4. 5 个消费者线程处于活动状态(因为并发设置为 5)。现在事件按照上面解释的顺序使用。 Thread1 拾取 event1。当 thread1 仍在积极处理 event1 时,另一个线程只是从流中挑选并开始处理下一个事件(thread2 处理 event2 等等......)。虽然在这种情况下保留了事件的顺序(事件 1 总是在事件 2 之前被拾取等等......),但不能保证线程 1 将在线程 2 之前完成处理事件 1。
  5. 当所有 5 个线程都忙于处理流中的 5 个事件时,如果有新事件说 event6 和 event7 进来,消费者必须等待线程可用。比如说,thread3 完成了对 event3 的处理,而其他线程仍在忙于处理 events,thread3 将拾取 event6 并开始处理,但 event7 仍然没有被拾取,因为没有可用的线程。
  6. 默认情况下,并发设置为 1。如果您的业务要求您在处理下一个事件之前完成第一个事件的处理,则并发应该为 1。在这种情况下,您将牺牲吞吐量。您一次只能消费一个事件。但是,如果吞吐量很重要,并且您希望在给定的时间点处理多个事件,则应将并发设置为所需的值。增加分片数量也是一种选择,但作为消费者,如果您不能要求增加分片数量,这是实现 parallelism/throughput.
  7. 的最佳选择

请参阅 KinesisMessageDrivenChannelAdapter 中的 concurrency 选项 JavaDocs:

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

因此,由于在那个流中只有一个分片,因此将只有一个活动线程在该单个分片上迭代 ShardIterators。

关键是我们总是必须在单个线程中处理来自单个分片的记录。通过这种方式,我们保证了正确的顺序,并且为最高序列号完成了检查点。

请进一步调查什么是 AWS Kinesis 及其工作原理。