在 Flink 中,stream windowing 好像不行?
In Flink, stream windowing does not seem to work?
我试图增强显示流用法的 Flink 示例。
我的目标是使用 windowing 功能(参见 window
函数调用)。
我假设下面的代码输出流的最后 3 个数字的总和。
(感谢 nc -lk 9999
在 ubuntu 上打开了流)
实际上,输出汇总了所有输入的数字。切换到时间 window 会产生相同的结果,即不会产生 windowing。
这是一个错误吗? (使用的版本:github 上的最新主控)
object SocketTextStreamWordCount {
def main(args: Array[String]) {
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
// .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
.map { (x:String) => ("not used; just to have a tuple for the sum", x.toInt) }
val numberOfItems = currentMap.count
numberOfItems print
val counts = currentMap.sum( 1 )
counts print
env.execute("Scala SocketTextStreamWordCount Example")
}
}
问题似乎是存在从 WindowedDataStream
到 DataStream
的隐式转换。此隐式转换在 WindowedDataStream
.
上调用 flatten()
您的情况是代码扩展为:
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
.flatten()
.map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }
flatten()
的作用类似于集合上的 flatMap()
。它将可以看作集合集合([[a,b,c], [d,e,f]]
)的 windows 流转换为元素流:[a,b,c,d,e,f]
.
这意味着您的计数实际上仅在已窗口化的原始流上运行 "de-windowed"。这看起来好像从来没有开窗过。
这是个问题,我会立即着手解决。 (我是 Flink 的提交者之一。)你可以在这里跟踪进度:https://issues.apache.org/jira/browse/FLINK-2096
当前 API 的方法是这样的:
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\W+") }
.filter { (x:String) => x.nonEmpty }
.map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
WindowedDataStream
有一个 sum() 方法,因此不会隐式插入 flatten() 调用。不幸的是,count()
在 WindowedDataStream
上不可用,因此为此您必须手动向元组添加一个 1
字段并计算这些。
我试图增强显示流用法的 Flink 示例。
我的目标是使用 windowing 功能(参见 window
函数调用)。
我假设下面的代码输出流的最后 3 个数字的总和。
(感谢 nc -lk 9999
在 ubuntu 上打开了流)
实际上,输出汇总了所有输入的数字。切换到时间 window 会产生相同的结果,即不会产生 windowing。
这是一个错误吗? (使用的版本:github 上的最新主控)
object SocketTextStreamWordCount {
def main(args: Array[String]) {
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create streams for names and ages by mapping the inputs to the corresponding objects
val text = env.socketTextStream(hostName, port)
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
// .window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
.map { (x:String) => ("not used; just to have a tuple for the sum", x.toInt) }
val numberOfItems = currentMap.count
numberOfItems print
val counts = currentMap.sum( 1 )
counts print
env.execute("Scala SocketTextStreamWordCount Example")
}
}
问题似乎是存在从 WindowedDataStream
到 DataStream
的隐式转换。此隐式转换在 WindowedDataStream
.
flatten()
您的情况是代码扩展为:
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\W+") }
.filter { (x:String) => x.nonEmpty }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
.flatten()
.map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }
flatten()
的作用类似于集合上的 flatMap()
。它将可以看作集合集合([[a,b,c], [d,e,f]]
)的 windows 流转换为元素流:[a,b,c,d,e,f]
.
这意味着您的计数实际上仅在已窗口化的原始流上运行 "de-windowed"。这看起来好像从来没有开窗过。
这是个问题,我会立即着手解决。 (我是 Flink 的提交者之一。)你可以在这里跟踪进度:https://issues.apache.org/jira/browse/FLINK-2096
当前 API 的方法是这样的:
val currentMap = text.flatMap { (x:String) => x.toLowerCase.split("\W+") }
.filter { (x:String) => x.nonEmpty }
.map { (x:String) => ("not used; just to have a tuple for the sum",x.toInt) }
.window(Count.of(3)).every(Time.of(1, TimeUnit.SECONDS))
WindowedDataStream
有一个 sum() 方法,因此不会隐式插入 flatten() 调用。不幸的是,count()
在 WindowedDataStream
上不可用,因此为此您必须手动向元组添加一个 1
字段并计算这些。