如何从 select * Flink 制作 select 字段

How to make select field from select * Flink

我正在尝试加入 2 个选择。

我必须在代码中进行查询,看起来像这个查询

select *
from  Data
where numPers > 10 && Object = P1

还有这个

select *
from  Data
where numPers < 20 && Object == P1

而且我只需要数据中的时间戳而不重复

我使用的程序代码如下所示

object Prog {

  def main(args: Array[String]) : Unit = {
    org.apache.log4j.BasicConfigurator.configure()

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = CsvTableSource
      .builder
      .path("src/main/resources/data.stream")
      .field("numPers", Types.INT)
      .field("Object", Types.STRING)
      .field("TIMESTAMP", Types.STRING)
      .fieldDelimiter(",")
      .ignoreFirstLine
      .ignoreParseErrors
      .commentPrefix("%")
      .build()

    tableEnv.registerTableSource("Data", csvTableSource)

    val table = tableEnv.scan("Data") //this works
      .filter("numPers > 10")
      .select("*")


    val ds = tableEnv.toAppendStream(table, classOf[Row])

    ds.print()
    env.execute()
  }
}

但是如何将第二个查询添加到第一个?

如果我正确理解您的要求,您不需要连接,只需要 BETWEEN 谓词:

val query = "SELECT * FROM Data WHERE numPers BETWEEN 10 AND 20 AND Object = P1"
val table = tableEnv.sqlQuery(query)