Kafka Streams - 减少大型状态存储的内存占用

Kafka Streams - reducing the memory footprint for large state stores

我有一个拓扑结构(见下文)可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用率非常高,我正在寻找一些关于如何减少状态存储占用空间的建议(下面有更多详细信息)。 注意:我并不是想逃避国家商店,我只是认为我可能有一种方法可以改进我的拓扑 - 见下文。

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道将 OUTPUT_TOPIC 作为 KTable 进行流式传输是否会导致状态存储 (REKEYED_STORE) 大于本地所需的大小。对于具有大量唯一键的变更日志主题,将它们作为 KStream 流式传输并进行 windowed 聚合会更好吗?或者这不会像我认为的那样减少足迹(例如,只有一部分记录 - window 中的记录会存在于本地状态存储中)。

无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:

如有任何帮助,我们将不胜感激!

根据您当前的模式

stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)

你得到两家内容相同的商店。一个用于 reduce() 运算符,一个用于读取 table() —— 虽然这可以减少到一个存储:

KTable rekeyedTable  = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely

这应该会显着减少您的内存使用量。

关于 windowing 与非 windowing:

  1. 这是你需要的语义问题;如此简单地从非 windowed 切换到 windowed reduce 似乎是有问题的。

  2. 即使您也可以使用 windowed 语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即键 + currentAgg)。因此,对于单个密钥,两种情况下的存储要求是相同的(单个 window 具有相同的存储要求)。同时,如果您使用 windows,您实际上可能需要更多内存,因为您获得了聚合专业密钥 pro window(而在非 window 案例)。您可能会节省内存的唯一情况是您 'key space' 分散在很长一段时间内的情况。例如,您可能很长时间都得不到某些键的任何输入记录。在非 windowed 情况下,这些记录的聚合将一直存储,而对于 windowed 情况,key/agg 记录将被删除并重新输入如果稍后再次出现具有此键的记录,将重新创建(但请记住,在这种情况下您丢失了以前的聚合门 - 参见(1))

最后但并非最不重要的一点是,您可能想查看调整应用程序大小的指南:http://docs.confluent.io/current/streams/sizing.html