Spring Cloud Aws kinesis Binder 负载均衡

Spring Cloud Aws kinesis Binder load balancing

我正在尝试为 Aws 运动流消费者实施负载平衡

根据我正在尝试实施的文档

spring:
  cloud:
    stream:
      instanceIndex: 1
      instanceCount: 3
      bindings:
        RollUpInboundStream:
          group: my-consumer-group
          destination: my-kinesis-stream
          content-type: application/json

我有 3 个容器,如果需要,我想调出 新容器(最多 6 个) 而无需重新启动现有容器。

  1. instanceIndex 从 0 或 1 开始。
  2. 如果我将 instanceCount 设置为 6 但仅启动三个实例,那么在启动新实例之前是否会消耗所有消息。
  3. 在文档中,有一个属性叫做spring.cloud.stream.bindings..consumer.concurrency,你能帮忙说明一下吗
  4. 出于某些原因,如果任何实例发生故障,是否会有任何消息未被使用。

你能帮帮我们吗

spring.cloud.stream.bindings..consumer.concurrency 是每个消费者的内部选项:

adapter.setConcurrency(properties.getConcurrency());

...

/**
 * The maximum number of concurrent {@link ConsumerInvoker}s running.
 * The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
 * Messages from within the same shard will be processed sequentially.
 * In other words each shard is tied with the particular thread.
 * By default the concurrency is unlimited and shard
 * is processed in the {@link #consumerExecutor} directly.
 * @param concurrency the concurrency maximum number
 */
public void setConcurrency(int concurrency) {

所以,这对您的分布式解决方案没有任何作用。

instanceIndexinstanceCount 在 Binder 中的工作方式如下:

    if (properties.getInstanceCount() > 1) {
        shardOffsets = new HashSet<>();
        KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
        List<Shard> shards = kinesisConsumerDestination.getShards();
        for (int i = 0; i < shards.size(); i++) {
            // divide shards across instances
            if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
                KinesisShardOffset shardOffset = new KinesisShardOffset(
                        kinesisShardOffset);
                shardOffset.setStream(destination.getName());
                shardOffset.setShard(shards.get(i).getShardId());
                shardOffsets.add(shardOffset);
            }
        }
    }

因此,每个消费者都会在流中获得一个分片子集。因此,如果您拥有的分片多于实例,您最终可能会发现某些分片未被消耗。

没有什么可以同时消费来自同一个分片的消息:每个集群只有一个线程可以消费一个分片。