如何在 Flink 中对 String DataStream 执行 timeWindow()?
How to perform timeWindow() on String DataStream in Flink?
我想在 Apache Flink 中做一次 window 流数据。我的数据看起来有点像这样:
1> {52,"mokshda",84.85}
2> {1,"kavita",26.16}
2> {131,"nidhi",178.9}
3> {2,"poorvi",22.97}
4> {115,"saheba",110.41}
每 20 秒,我想要所有行的分数总和(最后一列,例如 Mokshda 的分数是 84.85)。 timeWindow() 函数在 KeyedStream 上运行,因此我必须 keyBy() 这个 DataStream。我可以按卷号(第一列,例如 Mokshda 为 52)键入它。
val windowedStream = stockStream
.keyBy(0)
.timeWindow(Time.seconds(20))
.sum(2)
但显然,Flink 并没有将我的数据作为列表来读取。它将它作为字符串读取,因此,我得到以下异常:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: String
如何对字符串数据执行 timeWindow,或者如何将此数据转换为元组?
您可以使用 MapFunction[String, (Int, String, Double)]
将 DataStream[String]
转换为 DataStream[(Int, String, Double)]
,后者将 String 解析为其组件,转换数据类型并发出 Tuple
。
您还可以在非键控数据流上应用 timeWindowAll
。但是,语义当然不同,AllWindow 只能以并行度 1 处理。
我想在 Apache Flink 中做一次 window 流数据。我的数据看起来有点像这样:
1> {52,"mokshda",84.85}
2> {1,"kavita",26.16}
2> {131,"nidhi",178.9}
3> {2,"poorvi",22.97}
4> {115,"saheba",110.41}
每 20 秒,我想要所有行的分数总和(最后一列,例如 Mokshda 的分数是 84.85)。 timeWindow() 函数在 KeyedStream 上运行,因此我必须 keyBy() 这个 DataStream。我可以按卷号(第一列,例如 Mokshda 为 52)键入它。
val windowedStream = stockStream
.keyBy(0)
.timeWindow(Time.seconds(20))
.sum(2)
但显然,Flink 并没有将我的数据作为列表来读取。它将它作为字符串读取,因此,我得到以下异常:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: String
如何对字符串数据执行 timeWindow,或者如何将此数据转换为元组?
您可以使用 MapFunction[String, (Int, String, Double)]
将 DataStream[String]
转换为 DataStream[(Int, String, Double)]
,后者将 String 解析为其组件,转换数据类型并发出 Tuple
。
您还可以在非键控数据流上应用 timeWindowAll
。但是,语义当然不同,AllWindow 只能以并行度 1 处理。