Apache Kafka Streams 中特定分区的聚合
Aggregation over a specific partition in Apache Kafka Streams
假设我有一个名为 SensorData
的 Kafka 主题,两个传感器 S1 和 S2 将数据(时间戳和值)发送到两个不同的分区,例如S1 -> P1 和 S2 -> P2。现在我需要分别汇总这两个传感器的值,假设计算 1 小时 window 内的平均传感器值并将其写入新主题 SensorData1Hour
。有了这个场景
- 如何使用
KStreamBuilder#stream
方法 select 特定主题分区?
- 是否可以对同一主题的两个(多个)不同分区应用一些聚合函数?
您不能(直接)访问单个分区,也不能(直接)对多个分区应用聚合函数。
总是按照 key
进行聚合:http://docs.confluent.io/current/streams/developer-guide.html#stateful-transformations
- 因此,您可以为每个分区使用不同的密钥,而不是按密钥聚合。参见 http://docs.confluent.io/current/streams/developer-guide.html#windowing-a-stream
最简单的方法是让您的每个生产者立即为每条消息应用一个密钥。
- 如果要聚合多个分区,首先需要设置一个新的key(例如,使用
selectKey()
),并为所有要聚合的数据设置相同的key(如果要聚合所有分区,您将使用单个键值——但是,请记住,这可能很快成为瓶颈!)。
假设我有一个名为 SensorData
的 Kafka 主题,两个传感器 S1 和 S2 将数据(时间戳和值)发送到两个不同的分区,例如S1 -> P1 和 S2 -> P2。现在我需要分别汇总这两个传感器的值,假设计算 1 小时 window 内的平均传感器值并将其写入新主题 SensorData1Hour
。有了这个场景
- 如何使用
KStreamBuilder#stream
方法 select 特定主题分区? - 是否可以对同一主题的两个(多个)不同分区应用一些聚合函数?
您不能(直接)访问单个分区,也不能(直接)对多个分区应用聚合函数。
总是按照 key
进行聚合:http://docs.confluent.io/current/streams/developer-guide.html#stateful-transformations
- 因此,您可以为每个分区使用不同的密钥,而不是按密钥聚合。参见 http://docs.confluent.io/current/streams/developer-guide.html#windowing-a-stream
最简单的方法是让您的每个生产者立即为每条消息应用一个密钥。
- 如果要聚合多个分区,首先需要设置一个新的key(例如,使用
selectKey()
),并为所有要聚合的数据设置相同的key(如果要聚合所有分区,您将使用单个键值——但是,请记住,这可能很快成为瓶颈!)。