如何从 select * Flink 制作 select 字段
How to make select field from select * Flink
我正在尝试加入 2 个选择。
我必须在代码中进行查询,看起来像这个查询
select *
from Data
where numPers > 10 && Object = P1
还有这个
select *
from Data
where numPers < 20 && Object == P1
而且我只需要数据中的时间戳而不重复
我使用的程序代码如下所示
object Prog {
def main(args: Array[String]) : Unit = {
org.apache.log4j.BasicConfigurator.configure()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = CsvTableSource
.builder
.path("src/main/resources/data.stream")
.field("numPers", Types.INT)
.field("Object", Types.STRING)
.field("TIMESTAMP", Types.STRING)
.fieldDelimiter(",")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
.build()
tableEnv.registerTableSource("Data", csvTableSource)
val table = tableEnv.scan("Data") //this works
.filter("numPers > 10")
.select("*")
val ds = tableEnv.toAppendStream(table, classOf[Row])
ds.print()
env.execute()
}
}
但是如何将第二个查询添加到第一个?
如果我正确理解您的要求,您不需要连接,只需要 BETWEEN
谓词:
val query = "SELECT * FROM Data WHERE numPers BETWEEN 10 AND 20 AND Object = P1"
val table = tableEnv.sqlQuery(query)
我正在尝试加入 2 个选择。
我必须在代码中进行查询,看起来像这个查询
select *
from Data
where numPers > 10 && Object = P1
还有这个
select *
from Data
where numPers < 20 && Object == P1
而且我只需要数据中的时间戳而不重复
我使用的程序代码如下所示
object Prog {
def main(args: Array[String]) : Unit = {
org.apache.log4j.BasicConfigurator.configure()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
val csvTableSource = CsvTableSource
.builder
.path("src/main/resources/data.stream")
.field("numPers", Types.INT)
.field("Object", Types.STRING)
.field("TIMESTAMP", Types.STRING)
.fieldDelimiter(",")
.ignoreFirstLine
.ignoreParseErrors
.commentPrefix("%")
.build()
tableEnv.registerTableSource("Data", csvTableSource)
val table = tableEnv.scan("Data") //this works
.filter("numPers > 10")
.select("*")
val ds = tableEnv.toAppendStream(table, classOf[Row])
ds.print()
env.execute()
}
}
但是如何将第二个查询添加到第一个?
如果我正确理解您的要求,您不需要连接,只需要 BETWEEN
谓词:
val query = "SELECT * FROM Data WHERE numPers BETWEEN 10 AND 20 AND Object = P1"
val table = tableEnv.sqlQuery(query)