Spring Cloud Stream Kinesis Binder - 并发
Spring Cloud Stream Kinesis Binder - Concurrency
我用以下组件构建了一个 spring boot kinesis 消费者:
- spring 引导(版本 - 2.1.2.RELEASE)
- spring云(版本-Greenwich.RELEASE)
- spring 云流运动绑定器(版本 - 1.1.0.RELEASE)
我使用来自具有 1 个分片的运动流的事件。此 spring 启动消费者应用程序在 Pivotal Cloud Foundry Platform.
中是 运行
在发布此问题之前,我在本地(使用 kinesalite)和 PCF(使用运动流)中尝试了该场景。能否请您确认我的理解是否正确?我浏览了 spring 云流文档 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ and https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc)。尽管文档详尽无遗,但并未详细解释并发性和高可用性。
假设我有 3 个消费者实例部署到 PCF(通过在 cf 推送期间使用的 manifest.yml 文件中将 instances 属性设置为 3) .
所有 3 个实例都具有以下属性:
spring.cloud.stream.bindings..consumer.concurrency=5
spring.cloud.stream.bindings..group=我的消费者组
spring.cloud.stream.kinesis.binder.checkpoint.table=我的元数据-dynamodb-table
spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table
假设生产者按此顺序将事件发送到 kinesis
事件 5(流中的最新事件)- 事件 4 - 事件 3 - 事件 2 - 事件 1(流中的第一个事件)
对于这样的配置,我在下面说明了我的理解。你能确认这是否正确吗?
- 在给定的时间点只有一个消费者实例处于活动状态,它将处理发送到运动流的所有事件(因为流只有一个分片)。只有当主实例关闭时,其他 2 个实例之一才会接管控制权。此配置是为了确保高可用性并保持消息的顺序。
- 由于 PCF 的 manifest.yml 设置了实例数,因此我不必担心设置 spring.cloud.stream.instanceCount 或 spring.cloud.stream.bindings..consumer.instanceCount 属性。
当 spring 引导消费者为 started/launched 时,- 5 个消费者线程处于活动状态(因为并发设置为 5)。现在事件按照上面解释的顺序使用。 Thread1 拾取 event1。当 thread1 仍在积极处理 event1 时,另一个线程只是从流中挑选并开始处理下一个事件(thread2 处理 event2 等等......)。虽然在这种情况下保留了事件的顺序(事件 1 总是在事件 2 之前被拾取等等......),但不能保证线程 1 将在线程 2 之前完成处理事件 1。
- 当所有 5 个线程都忙于处理流中的 5 个事件时,如果有新事件说 event6 和 event7 进来,消费者必须等待线程可用。比如说,thread3 完成了对 event3 的处理,而其他线程仍在忙于处理 events,thread3 将拾取 event6 并开始处理,但 event7 仍然没有被拾取,因为没有可用的线程。
- 默认情况下,并发设置为 1。如果您的业务要求您在处理下一个事件之前完成第一个事件的处理,则并发应该为 1。在这种情况下,您将牺牲吞吐量。您一次只能消费一个事件。但是,如果吞吐量很重要,并且您希望在给定的时间点处理多个事件,则应将并发设置为所需的值。增加分片数量也是一种选择,但作为消费者,如果您不能要求增加分片数量,这是实现 parallelism/throughput.
的最佳选择
请参阅 KinesisMessageDrivenChannelAdapter
中的 concurrency
选项 JavaDocs:
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
因此,由于在那个流中只有一个分片,因此将只有一个活动线程在该单个分片上迭代 ShardIterator
s。
关键是我们总是必须在单个线程中处理来自单个分片的记录。通过这种方式,我们保证了正确的顺序,并且为最高序列号完成了检查点。
请进一步调查什么是 AWS Kinesis 及其工作原理。
我用以下组件构建了一个 spring boot kinesis 消费者:
- spring 引导(版本 - 2.1.2.RELEASE)
- spring云(版本-Greenwich.RELEASE)
- spring 云流运动绑定器(版本 - 1.1.0.RELEASE)
我使用来自具有 1 个分片的运动流的事件。此 spring 启动消费者应用程序在 Pivotal Cloud Foundry Platform.
中是 运行在发布此问题之前,我在本地(使用 kinesalite)和 PCF(使用运动流)中尝试了该场景。能否请您确认我的理解是否正确?我浏览了 spring 云流文档 (https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/ and https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc)。尽管文档详尽无遗,但并未详细解释并发性和高可用性。
假设我有 3 个消费者实例部署到 PCF(通过在 cf 推送期间使用的 manifest.yml 文件中将 instances 属性设置为 3) .
所有 3 个实例都具有以下属性:
spring.cloud.stream.bindings..consumer.concurrency=5
spring.cloud.stream.bindings..group=我的消费者组
spring.cloud.stream.kinesis.binder.checkpoint.table=我的元数据-dynamodb-table
spring.cloud.stream.kinesis.binder.locks.table=my-locks-dynamodb-table
假设生产者按此顺序将事件发送到 kinesis
事件 5(流中的最新事件)- 事件 4 - 事件 3 - 事件 2 - 事件 1(流中的第一个事件)
对于这样的配置,我在下面说明了我的理解。你能确认这是否正确吗?
- 在给定的时间点只有一个消费者实例处于活动状态,它将处理发送到运动流的所有事件(因为流只有一个分片)。只有当主实例关闭时,其他 2 个实例之一才会接管控制权。此配置是为了确保高可用性并保持消息的顺序。
- 由于 PCF 的 manifest.yml 设置了实例数,因此我不必担心设置 spring.cloud.stream.instanceCount 或 spring.cloud.stream.bindings..consumer.instanceCount 属性。 当 spring 引导消费者为 started/launched 时,
- 5 个消费者线程处于活动状态(因为并发设置为 5)。现在事件按照上面解释的顺序使用。 Thread1 拾取 event1。当 thread1 仍在积极处理 event1 时,另一个线程只是从流中挑选并开始处理下一个事件(thread2 处理 event2 等等......)。虽然在这种情况下保留了事件的顺序(事件 1 总是在事件 2 之前被拾取等等......),但不能保证线程 1 将在线程 2 之前完成处理事件 1。
- 当所有 5 个线程都忙于处理流中的 5 个事件时,如果有新事件说 event6 和 event7 进来,消费者必须等待线程可用。比如说,thread3 完成了对 event3 的处理,而其他线程仍在忙于处理 events,thread3 将拾取 event6 并开始处理,但 event7 仍然没有被拾取,因为没有可用的线程。
- 默认情况下,并发设置为 1。如果您的业务要求您在处理下一个事件之前完成第一个事件的处理,则并发应该为 1。在这种情况下,您将牺牲吞吐量。您一次只能消费一个事件。但是,如果吞吐量很重要,并且您希望在给定的时间点处理多个事件,则应将并发设置为所需的值。增加分片数量也是一种选择,但作为消费者,如果您不能要求增加分片数量,这是实现 parallelism/throughput. 的最佳选择
请参阅 KinesisMessageDrivenChannelAdapter
中的 concurrency
选项 JavaDocs:
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
因此,由于在那个流中只有一个分片,因此将只有一个活动线程在该单个分片上迭代 ShardIterator
s。
关键是我们总是必须在单个线程中处理来自单个分片的记录。通过这种方式,我们保证了正确的顺序,并且为最高序列号完成了检查点。
请进一步调查什么是 AWS Kinesis 及其工作原理。