Apache Flink的两种类型window,时间和"count"windows
Apache Flink two types of window, time and "count" windows
我正在尝试使用 window.
将文件作为流处理
这是代码
object Prog {
def main(args: Array[String]) : Unit = {
org.apache.log4j.BasicConfigurator.configure()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = CsvTableSource
.builder
.path("src/main/resources/data.stream")
.field("numPers", Types.INT)
.field("TIMESTAMP", Types.STRING)
.fieldDelimiter(",")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
.build()
tableEnv.registerTableSource("Data", csvTableSource)
val table = tableEnv.scan("Data")
.filter("numPers > 10")
.select("*")
val ds = tableEnv.toAppendStream(table, classOf[Row])
ds.print()
env.execute()
}
}
问题是如何在此处实现 window,例如,仅显示不早于一小时的值。
或者第二种 window 类型,当我读到最后 50 个条目时。
在流处理中,windows 是计算聚合的组。
您的用例似乎有所不同。如果您想保留最后 x
分钟或 y
最后记录,则需要在 SQL.
中以不同方式表达
保留最后 5
分钟类似于
SELECT * FROM Data d WHERE d.tstamp > (now() - INTERVAL '5' MINUTE)
因此,这将是某种时间戳属性的过滤器。
保留最后 10 行是
SELECT * FROM Data d ORDER BY d.tstamp DESC LIMIT 10
但是,none这些操作还被Flink(1.5版)支持SQL或TableAPI .
我正在尝试使用 window.
将文件作为流处理这是代码
object Prog {
def main(args: Array[String]) : Unit = {
org.apache.log4j.BasicConfigurator.configure()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = CsvTableSource
.builder
.path("src/main/resources/data.stream")
.field("numPers", Types.INT)
.field("TIMESTAMP", Types.STRING)
.fieldDelimiter(",")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
.build()
tableEnv.registerTableSource("Data", csvTableSource)
val table = tableEnv.scan("Data")
.filter("numPers > 10")
.select("*")
val ds = tableEnv.toAppendStream(table, classOf[Row])
ds.print()
env.execute()
}
}
问题是如何在此处实现 window,例如,仅显示不早于一小时的值。 或者第二种 window 类型,当我读到最后 50 个条目时。
在流处理中,windows 是计算聚合的组。
您的用例似乎有所不同。如果您想保留最后 x
分钟或 y
最后记录,则需要在 SQL.
保留最后 5
分钟类似于
SELECT * FROM Data d WHERE d.tstamp > (now() - INTERVAL '5' MINUTE)
因此,这将是某种时间戳属性的过滤器。
保留最后 10 行是
SELECT * FROM Data d ORDER BY d.tstamp DESC LIMIT 10
但是,none这些操作还被Flink(1.5版)支持SQL或TableAPI .