Kafka Streams - 如何扩展 Kafka 存储生成的变更日志主题
Kafka Streams - How to scale Kafka store generated changelog topics
我有多个冗余应用程序实例,它们想要使用主题的所有事件并独立存储它们以供磁盘查找(通过 rocksdb)。
为了论证,我们假设这些冗余消费者正在处理无状态的 http 请求;因此负载不使用 kafka 共享,而是使用 kafka 将数据从生产者复制到每个实例本地存储中。
查看生成的主题时,每个消费应用创建了 3 个额外的主题:
- {topicname}STATE-STORE-0000000000-更新日志
- {application-name}-{storename}-changelog
- {application-name}-{storename}-repartition
但是这些生成的主题中的每一个都与原始主题的压缩视图一样大。这意味着每个消费存储乘以原始主题(已经压缩)的大小的 3。
- 为什么kafka store需要这3个topic。我们不能简单地将流配置为在协调磁盘存储时从上次使用的偏移量重新加载吗?
- 冗余消费应用程序的每个实例都获得其唯一的一组 3 "store generated topics",还是应该将它们配置为共享同一组更新日志主题?那么,他们应该共享相同的 applicationId 还是不共享,因为他们需要消耗所有分区的所有事件?
简而言之,我担心存储可扩展性,因为我们会增加会产生更多更改日志主题的消费应用程序的数量...
这是创建商店的代码
public class ProgramMappingEventStoreFactory {
private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
private final static String STORE_NAME = "program-mapping-store";
private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";
public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
String avroRegistryUrl,
String topic,
String storeDirectory)
{
Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
.withSchemaRegistryUrl(avroRegistryUrl)
.withApplicationId(createApplicationId(APPLICATION_NAME))
.withGroupId(UUID.randomUUID().toString())
.withClientId(UUID.randomUUID().toString())
.withDefaultKeySerdeClass(SpecificAvroSerde.class)
.withDefaultValueSerdeClass(SpecificAvroSerde.class)
.withStoreDirectory(storeDirectory)
.build();
StreamsBuilder streamBuilder = new StreamsBuilder();
bootstrapStore(streamBuilder, topic);
KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
streams.start();
try {
return getStoreAndBlockUntilQueryable(STORE_NAME,
QueryableStoreTypes.keyValueStore(),
streams);
} catch (InterruptedException e) {
throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
}
}
private static <T> T getStoreAndBlockUntilQueryable(String storeName,
QueryableStoreType<T> queryableStoreType,
KafkaStreams streams)
throws InterruptedException
{
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
Thread.sleep(100);
}
}
}
private static void bootstrapStore(StreamsBuilder builder, String topic) {
KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);
table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
(newValue, aggValue) -> null,
Materialized.as(STORE_NAME));
}
private static String createApplicationId(String applicationName) {
try {
return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
return String.format("%s-%s", applicationName, UUID.randomUUID());
}
}
}
如果您想将相同的状态加载到多个实例中,您应该使用 GlobalKTable
和一个唯一的 application.id
覆盖所有实例 (builder.globalTable()
)。
如果您使用 KTable
,数据将被分区,迫使您对每个实例使用不同的 application.id
。这可以被认为是一种反模式。
我也不确定你为什么要这样做 groupBy((k, v) -> KeyValue.pair(k, v)).reduce()
-- 这会导致不必要的重新分区主题。
对于为 table()
运算符生成的更新日志主题,如果使用 StreamsBuilder
,则 1.0
和 1.1
版本中存在已知错误(KStreamBuilder
不受影响)。它已在 2.0
版本 (https://issues.apache.org/jira/browse/KAFKA-6729)
中修复
我有多个冗余应用程序实例,它们想要使用主题的所有事件并独立存储它们以供磁盘查找(通过 rocksdb)。
为了论证,我们假设这些冗余消费者正在处理无状态的 http 请求;因此负载不使用 kafka 共享,而是使用 kafka 将数据从生产者复制到每个实例本地存储中。
查看生成的主题时,每个消费应用创建了 3 个额外的主题:
- {topicname}STATE-STORE-0000000000-更新日志
- {application-name}-{storename}-changelog
- {application-name}-{storename}-repartition
但是这些生成的主题中的每一个都与原始主题的压缩视图一样大。这意味着每个消费存储乘以原始主题(已经压缩)的大小的 3。
- 为什么kafka store需要这3个topic。我们不能简单地将流配置为在协调磁盘存储时从上次使用的偏移量重新加载吗?
- 冗余消费应用程序的每个实例都获得其唯一的一组 3 "store generated topics",还是应该将它们配置为共享同一组更新日志主题?那么,他们应该共享相同的 applicationId 还是不共享,因为他们需要消耗所有分区的所有事件?
简而言之,我担心存储可扩展性,因为我们会增加会产生更多更改日志主题的消费应用程序的数量...
这是创建商店的代码
public class ProgramMappingEventStoreFactory {
private static final Logger logger = Logger.getLogger(ProgramMappingEventStoreFactory.class.getName());
private final static String STORE_NAME = "program-mapping-store";
private final static String APPLICATION_NAME = "epg-mapping-catalog_program-mapping";
public static ReadOnlyKeyValueStore<ProgramMappingEventKey, ProgramMappingEvent> newInstance(String kafkaBootstrapServerUrl,
String avroRegistryUrl,
String topic,
String storeDirectory)
{
Properties kafkaConfig = new KafkaConfigBuilder().withBootstrapServers(kafkaBootstrapServerUrl)
.withSchemaRegistryUrl(avroRegistryUrl)
.withApplicationId(createApplicationId(APPLICATION_NAME))
.withGroupId(UUID.randomUUID().toString())
.withClientId(UUID.randomUUID().toString())
.withDefaultKeySerdeClass(SpecificAvroSerde.class)
.withDefaultValueSerdeClass(SpecificAvroSerde.class)
.withStoreDirectory(storeDirectory)
.build();
StreamsBuilder streamBuilder = new StreamsBuilder();
bootstrapStore(streamBuilder, topic);
KafkaStreams streams = new KafkaStreams(streamBuilder.build(), kafkaConfig);
streams.start();
try {
return getStoreAndBlockUntilQueryable(STORE_NAME,
QueryableStoreTypes.keyValueStore(),
streams);
} catch (InterruptedException e) {
throw new IllegalStateException("Failed to create the LiveMediaPolicyIdStore", e);
}
}
private static <T> T getStoreAndBlockUntilQueryable(String storeName,
QueryableStoreType<T> queryableStoreType,
KafkaStreams streams)
throws InterruptedException
{
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
Thread.sleep(100);
}
}
}
private static void bootstrapStore(StreamsBuilder builder, String topic) {
KTable<ProgramMappingEventKey, ProgramMappingEvent> table = builder.table(topic);
table.groupBy((k, v) -> KeyValue.pair(k, v)).reduce((newValue, aggValue) -> newValue,
(newValue, aggValue) -> null,
Materialized.as(STORE_NAME));
}
private static String createApplicationId(String applicationName) {
try {
return String.format("%s-%s", applicationName, InetAddress.getLocalHost().getHostName());
} catch (UnknownHostException e) {
logger.warning(() -> "Failed to find the hostname, generating a uique applicationId");
return String.format("%s-%s", applicationName, UUID.randomUUID());
}
}
}
如果您想将相同的状态加载到多个实例中,您应该使用 GlobalKTable
和一个唯一的 application.id
覆盖所有实例 (builder.globalTable()
)。
如果您使用 KTable
,数据将被分区,迫使您对每个实例使用不同的 application.id
。这可以被认为是一种反模式。
我也不确定你为什么要这样做 groupBy((k, v) -> KeyValue.pair(k, v)).reduce()
-- 这会导致不必要的重新分区主题。
对于为 table()
运算符生成的更新日志主题,如果使用 StreamsBuilder
,则 1.0
和 1.1
版本中存在已知错误(KStreamBuilder
不受影响)。它已在 2.0
版本 (https://issues.apache.org/jira/browse/KAFKA-6729)