Kafka Streams - 消费者内存过载
Kafka Streams - Consumer memory overload
我正在计划一个 Spring+Kafka Streams 应用程序来处理传入的消息并存储由于这些消息而更新的内部状态。
预计此状态将达到每个唯一密钥约 500mb(可能有约 10k 个唯一密钥分布在 2k 个分区中)。
此状态通常必须保存在内存中才能使我的应用程序有效运行,但即使在磁盘上我仍然会遇到类似的问题(尽管只是在稍后的扩展日期)。
我计划将此应用程序部署到 AWS 等动态扩展环境中,并将设置最小实例数,但我对两种情况持谨慎态度:
- 在第一次启动时(可能只有 1 个消费者首先启动)它将无法处理所有分区的分配,因为内存状态会溢出实例的可用内存。
- 在一次重大中断(AWS 可用性区域中断)之后,可能有 33% 的消费者被带出组,而剩余实例上的额外内存负载实际上可能会带走所有剩下的人。
人们如何保护他们的消费者,以免他们占用的分区超过他们的处理能力,从而不会溢出可用 memory/disk?
从 0.11 开始...
编辑
对于你的第二个用例(它也适用于第一个),也许你可以实现一个自定义 PartitionAssignor
来限制分配给每个实例的分区数量。
没试过;我不知道代理将如何应对未分配分区的存在。
EDIT2
这似乎工作正常;但是 YMMV...
public class NoMoreThanFiveAssignor extends RoundRobinAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);
assignments.forEach((memberId, assigned) -> {
if (assigned.size() > 5) {
System.out.println("Reducing assignments from " + assigned.size() + " to 5 for " + memberId);
assignments.put(memberId,
assigned.stream()
.limit(5)
.collect(Collectors.toList()));
}
});
return assignments;
}
}
和
@SpringBootApplication
public class So54072362Application {
public static void main(String[] args) {
SpringApplication.run(So54072362Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so54072362", 15, (short) 1);
}
@KafkaListener(id = "so54072362", topics = "so54072362")
public void listen(ConsumerRecord<?, ?> record) {
System.out.println(record);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
for (int i = 0; i < 15; i++) {
template.send("so54072362", i, "foo", "bar");
}
};
}
}
和
spring.kafka.consumer.properties.partition.assignment.strategy=com.example.NoMoreThanFiveAssignor
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
和
Reducing assignments from 15 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:28.288 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 7
2019-01-07 15:24:28.289 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:28.296 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.304 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
Reducing assignments from 8 to 5 for consumer-2-c9a6928a-520c-4646-9dd9-4da14636744b
Reducing assignments from 7 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:46.310 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 8
2019-01-07 15:24:46.311 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:46.315 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
2019-01-07 15:24:58.330 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 9
2019-01-07 15:24:58.332 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
2019-01-07 15:24:58.336 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
当然,这会使未分配的分区悬空,但听起来这就是您想要的,直到该区域重新联机。
我正在计划一个 Spring+Kafka Streams 应用程序来处理传入的消息并存储由于这些消息而更新的内部状态。 预计此状态将达到每个唯一密钥约 500mb(可能有约 10k 个唯一密钥分布在 2k 个分区中)。
此状态通常必须保存在内存中才能使我的应用程序有效运行,但即使在磁盘上我仍然会遇到类似的问题(尽管只是在稍后的扩展日期)。
我计划将此应用程序部署到 AWS 等动态扩展环境中,并将设置最小实例数,但我对两种情况持谨慎态度:
- 在第一次启动时(可能只有 1 个消费者首先启动)它将无法处理所有分区的分配,因为内存状态会溢出实例的可用内存。
- 在一次重大中断(AWS 可用性区域中断)之后,可能有 33% 的消费者被带出组,而剩余实例上的额外内存负载实际上可能会带走所有剩下的人。
人们如何保护他们的消费者,以免他们占用的分区超过他们的处理能力,从而不会溢出可用 memory/disk?
从 0.11 开始...
编辑
对于你的第二个用例(它也适用于第一个),也许你可以实现一个自定义 PartitionAssignor
来限制分配给每个实例的分区数量。
没试过;我不知道代理将如何应对未分配分区的存在。
EDIT2
这似乎工作正常;但是 YMMV...
public class NoMoreThanFiveAssignor extends RoundRobinAssignor {
@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
Map<String, Subscription> subscriptions) {
Map<String, List<TopicPartition>> assignments = super.assign(partitionsPerTopic, subscriptions);
assignments.forEach((memberId, assigned) -> {
if (assigned.size() > 5) {
System.out.println("Reducing assignments from " + assigned.size() + " to 5 for " + memberId);
assignments.put(memberId,
assigned.stream()
.limit(5)
.collect(Collectors.toList()));
}
});
return assignments;
}
}
和
@SpringBootApplication
public class So54072362Application {
public static void main(String[] args) {
SpringApplication.run(So54072362Application.class, args);
}
@Bean
public NewTopic topic() {
return new NewTopic("so54072362", 15, (short) 1);
}
@KafkaListener(id = "so54072362", topics = "so54072362")
public void listen(ConsumerRecord<?, ?> record) {
System.out.println(record);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
for (int i = 0; i < 15; i++) {
template.send("so54072362", i, "foo", "bar");
}
};
}
}
和
spring.kafka.consumer.properties.partition.assignment.strategy=com.example.NoMoreThanFiveAssignor
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
和
Reducing assignments from 15 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:28.288 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 7
2019-01-07 15:24:28.289 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:28.296 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.303 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so54072362-0, so54072362-1, so54072362-2, so54072362-3, so54072362-4]
2019-01-07 15:24:46.304 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
Reducing assignments from 8 to 5 for consumer-2-c9a6928a-520c-4646-9dd9-4da14636744b
Reducing assignments from 7 to 5 for consumer-2-f37221f8-70bb-421d-9faf-6591cc26a76a
2019-01-07 15:24:46.310 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 8
2019-01-07 15:24:46.311 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:46.315 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Attempt to heartbeat failed since group is rebalancing
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Revoking previously assigned partitions [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so54072362-9, so54072362-5, so54072362-7, so54072362-1, so54072362-3]
2019-01-07 15:24:58.324 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] (Re-)joining group
2019-01-07 15:24:58.330 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Successfully joined group with generation 9
2019-01-07 15:24:58.332 INFO 23485 --- [o54072362-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=so54072362] Setting newly assigned partitions [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
2019-01-07 15:24:58.336 INFO 23485 --- [o54072362-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so54072362-14, so54072362-11, so54072362-5, so54072362-8, so54072362-2]
当然,这会使未分配的分区悬空,但听起来这就是您想要的,直到该区域重新联机。