Kafka Streams - 减少大型状态存储的内存占用
Kafka Streams - reducing the memory footprint for large state stores
我有一个拓扑结构(见下文)可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用率非常高,我正在寻找一些关于如何减少状态存储占用空间的建议(下面有更多详细信息)。 注意:我并不是想逃避国家商店,我只是认为我可能有一种方法可以改进我的拓扑 - 见下文。
// stream receives 1 billion+ messages per day
stream
.flatMap((key, msg) -> rekeyMessages(msg))
.groupBy((key, value) -> key)
.reduce(new MyReducer(), MY_REDUCED_STORE)
.toStream()
.to(OUTPUT_TOPIC);
// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);
// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)
// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)
// etc
更具体地说,我想知道将 OUTPUT_TOPIC
作为 KTable 进行流式传输是否会导致状态存储 (REKEYED_STORE
) 大于本地所需的大小。对于具有大量唯一键的变更日志主题,将它们作为 KStream
流式传输并进行 windowed 聚合会更好吗?或者这不会像我认为的那样减少足迹(例如,只有一部分记录 - window 中的记录会存在于本地状态存储中)。
无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:
- 对于具有这种吞吐量水平的 Kafka Streams 应用程序,是否应该考虑任何配置选项、一般策略等?
- 是否有关于单个实例的内存密集程度的指南?即使您的指导方针有些武断,与他人分享也可能会有所帮助。我的一个实例目前正在使用 15GB 内存 - 我不知道这是否 good/bad/doesn 无关紧要。
如有任何帮助,我们将不胜感激!
根据您当前的模式
stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)
你得到两家内容相同的商店。一个用于 reduce()
运算符,一个用于读取 table()
—— 虽然这可以减少到一个存储:
KTable rekeyedTable = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely
这应该会显着减少您的内存使用量。
关于 windowing 与非 windowing:
这是你需要的语义问题;如此简单地从非 windowed 切换到 windowed reduce 似乎是有问题的。
即使您也可以使用 windowed 语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即键 + currentAgg)。因此,对于单个密钥,两种情况下的存储要求是相同的(单个 window 具有相同的存储要求)。同时,如果您使用 windows,您实际上可能需要更多内存,因为您获得了聚合专业密钥 pro window(而在非 window 案例)。您可能会节省内存的唯一情况是您 'key space' 分散在很长一段时间内的情况。例如,您可能很长时间都得不到某些键的任何输入记录。在非 windowed 情况下,这些记录的聚合将一直存储,而对于 windowed 情况,key/agg 记录将被删除并重新输入如果稍后再次出现具有此键的记录,将重新创建(但请记住,在这种情况下您丢失了以前的聚合门 - 参见(1))
最后但并非最不重要的一点是,您可能想查看调整应用程序大小的指南:http://docs.confluent.io/current/streams/sizing.html
我有一个拓扑结构(见下文)可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用率非常高,我正在寻找一些关于如何减少状态存储占用空间的建议(下面有更多详细信息)。 注意:我并不是想逃避国家商店,我只是认为我可能有一种方法可以改进我的拓扑 - 见下文。
// stream receives 1 billion+ messages per day
stream
.flatMap((key, msg) -> rekeyMessages(msg))
.groupBy((key, value) -> key)
.reduce(new MyReducer(), MY_REDUCED_STORE)
.toStream()
.to(OUTPUT_TOPIC);
// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);
// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)
// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)
// etc
更具体地说,我想知道将 OUTPUT_TOPIC
作为 KTable 进行流式传输是否会导致状态存储 (REKEYED_STORE
) 大于本地所需的大小。对于具有大量唯一键的变更日志主题,将它们作为 KStream
流式传输并进行 windowed 聚合会更好吗?或者这不会像我认为的那样减少足迹(例如,只有一部分记录 - window 中的记录会存在于本地状态存储中)。
无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:
- 对于具有这种吞吐量水平的 Kafka Streams 应用程序,是否应该考虑任何配置选项、一般策略等?
- 是否有关于单个实例的内存密集程度的指南?即使您的指导方针有些武断,与他人分享也可能会有所帮助。我的一个实例目前正在使用 15GB 内存 - 我不知道这是否 good/bad/doesn 无关紧要。
如有任何帮助,我们将不胜感激!
根据您当前的模式
stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)
你得到两家内容相同的商店。一个用于 reduce()
运算符,一个用于读取 table()
—— 虽然这可以减少到一个存储:
KTable rekeyedTable = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely
这应该会显着减少您的内存使用量。
关于 windowing 与非 windowing:
这是你需要的语义问题;如此简单地从非 windowed 切换到 windowed reduce 似乎是有问题的。
即使您也可以使用 windowed 语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即键 + currentAgg)。因此,对于单个密钥,两种情况下的存储要求是相同的(单个 window 具有相同的存储要求)。同时,如果您使用 windows,您实际上可能需要更多内存,因为您获得了聚合专业密钥 pro window(而在非 window 案例)。您可能会节省内存的唯一情况是您 'key space' 分散在很长一段时间内的情况。例如,您可能很长时间都得不到某些键的任何输入记录。在非 windowed 情况下,这些记录的聚合将一直存储,而对于 windowed 情况,key/agg 记录将被删除并重新输入如果稍后再次出现具有此键的记录,将重新创建(但请记住,在这种情况下您丢失了以前的聚合门 - 参见(1))
最后但并非最不重要的一点是,您可能想查看调整应用程序大小的指南:http://docs.confluent.io/current/streams/sizing.html