Spring Cloud Aws kinesis Binder 负载均衡
Spring Cloud Aws kinesis Binder load balancing
我正在尝试为 Aws 运动流消费者实施负载平衡
根据我正在尝试实施的文档
spring:
cloud:
stream:
instanceIndex: 1
instanceCount: 3
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-kinesis-stream
content-type: application/json
我有 3 个容器,如果需要,我想调出 新容器(最多 6 个) 而无需重新启动现有容器。
- instanceIndex 从 0 或 1 开始。
- 如果我将 instanceCount 设置为 6 但仅启动三个实例,那么在启动新实例之前是否会消耗所有消息。
- 在文档中,有一个属性叫做spring.cloud.stream.bindings..consumer.concurrency,你能帮忙说明一下吗
- 出于某些原因,如果任何实例发生故障,是否会有任何消息未被使用。
你能帮帮我们吗
spring.cloud.stream.bindings..consumer.concurrency
是每个消费者的内部选项:
adapter.setConcurrency(properties.getConcurrency());
...
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
所以,这对您的分布式解决方案没有任何作用。
instanceIndex
和 instanceCount
在 Binder 中的工作方式如下:
if (properties.getInstanceCount() > 1) {
shardOffsets = new HashSet<>();
KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
List<Shard> shards = kinesisConsumerDestination.getShards();
for (int i = 0; i < shards.size(); i++) {
// divide shards across instances
if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
KinesisShardOffset shardOffset = new KinesisShardOffset(
kinesisShardOffset);
shardOffset.setStream(destination.getName());
shardOffset.setShard(shards.get(i).getShardId());
shardOffsets.add(shardOffset);
}
}
}
因此,每个消费者都会在流中获得一个分片子集。因此,如果您拥有的分片多于实例,您最终可能会发现某些分片未被消耗。
没有什么可以同时消费来自同一个分片的消息:每个集群只有一个线程可以消费一个分片。
我正在尝试为 Aws 运动流消费者实施负载平衡
根据我正在尝试实施的文档
spring:
cloud:
stream:
instanceIndex: 1
instanceCount: 3
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-kinesis-stream
content-type: application/json
我有 3 个容器,如果需要,我想调出 新容器(最多 6 个) 而无需重新启动现有容器。
- instanceIndex 从 0 或 1 开始。
- 如果我将 instanceCount 设置为 6 但仅启动三个实例,那么在启动新实例之前是否会消耗所有消息。
- 在文档中,有一个属性叫做spring.cloud.stream.bindings..consumer.concurrency,你能帮忙说明一下吗
- 出于某些原因,如果任何实例发生故障,是否会有任何消息未被使用。
你能帮帮我们吗
spring.cloud.stream.bindings..consumer.concurrency
是每个消费者的内部选项:
adapter.setConcurrency(properties.getConcurrency());
...
/**
* The maximum number of concurrent {@link ConsumerInvoker}s running.
* The {@link ShardConsumer}s are evenly distributed between {@link ConsumerInvoker}s.
* Messages from within the same shard will be processed sequentially.
* In other words each shard is tied with the particular thread.
* By default the concurrency is unlimited and shard
* is processed in the {@link #consumerExecutor} directly.
* @param concurrency the concurrency maximum number
*/
public void setConcurrency(int concurrency) {
所以,这对您的分布式解决方案没有任何作用。
instanceIndex
和 instanceCount
在 Binder 中的工作方式如下:
if (properties.getInstanceCount() > 1) {
shardOffsets = new HashSet<>();
KinesisConsumerDestination kinesisConsumerDestination = (KinesisConsumerDestination) destination;
List<Shard> shards = kinesisConsumerDestination.getShards();
for (int i = 0; i < shards.size(); i++) {
// divide shards across instances
if ((i % properties.getInstanceCount()) == properties.getInstanceIndex()) {
KinesisShardOffset shardOffset = new KinesisShardOffset(
kinesisShardOffset);
shardOffset.setStream(destination.getName());
shardOffset.setShard(shards.get(i).getShardId());
shardOffsets.add(shardOffset);
}
}
}
因此,每个消费者都会在流中获得一个分片子集。因此,如果您拥有的分片多于实例,您最终可能会发现某些分片未被消耗。
没有什么可以同时消费来自同一个分片的消息:每个集群只有一个线程可以消费一个分片。