白天时间间隔的无效常量:org.apache.flink.table.api.ApiExpression

Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression

我有以下简单代码执行基于处理时间的翻转 window,使用 table api,但是当我 运行 它时抛出异常。我不知道它在说什么,有人可以帮忙看看吗?谢谢!

Stock案例class定义如下:

case class Stock(id: String, trade_date: Timestamp, price: Double)

申请代码为:

package org.example.sqlcookbook

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{AnyWithOperations, FieldExpression, Tumble, lit}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.example.sources.StockSource


    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val ds = env.addSource(new StockSource(emitInterval = 1500, print = false))
    val tenv = StreamTableEnvironment.create(env)
    val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"pt".proctime())
    val result = table.window(Tumble.over(lit(4).second()).on($"pt").as("w"))
      .groupBy($"id", $"w")
      .select(
        $"id",
        $"w".start().as("w_start"),
        $"w".`end`().as("w_end"),
        $"price".sum().as("sum_price")
      )

    result.toAppendStream[Row].print()
    env.execute()

StockSource 是:

package org.example.sources

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.example.model.Stock
import  org.example.utils.Implicits._
import scala.util.Try

class StockSource(emitInterval: Int = 0, print: Boolean = false) extends RichSourceFunction[Stock] {
  val fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  val running = true

  override def run(sc: SourceContext[Stock]): Unit = {
    //Stock Collection
    Stock.stocks.foreach {
      stock =>
        sc.collect(stock)

        if (print) {
          System.out.println("Source-" + stock.trade_date.to_str + "-" + stock.price)
        }

        if (emitInterval > 0) {
          Thread.sleep(emitInterval)
        }
    }
    //Keep Running
    while (true) {
      Try {
        Thread.sleep(Long.MaxValue)
      }
    }

  }

  override def cancel(): Unit = {
  }
}

Stock 对象是:

object Stock {

  val stocks = Seq(
    Stock("id1", "2020-09-16 20:50:15".ts, 1),
    Stock("id1", "2020-09-16 20:50:12".ts, 2),
    Stock("id1", "2020-09-16 20:50:11".ts, 3),
    Stock("id1", "2020-09-16 20:50:18".ts, 4),
    Stock("id1", "2020-09-16 20:50:13".ts, 5),
    Stock("id1", "2020-09-16 20:50:20".ts, 6),
    Stock("id1", "2020-09-16 20:50:14".ts, 7),
    Stock("id1", "2020-09-16 20:50:22".ts, 8),
    Stock("id1", "2020-09-16 20:50:40".ts, 9)
  )


}

异常信息为:

Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression@731692
org.apache.flink.table.api.ValidationException: Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression@731692
    at org.apache.flink.table.expressions.ApiExpressionUtils.lambda$toMilliInterval(ApiExpressionUtils.java:315)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval(ApiExpressionUtils.java:315)
    at org.apache.flink.table.api.internal.BaseExpressions.second(BaseExpressions.java:1193)
    at org.example.sqlcookbook.T006_GroupByProcessWindow$$anonfun.apply(T006_GroupByProcessWindow.scala:42)
    at org.example.sqlcookbook.T006_GroupByProcessWindow$$anonfun.apply(T006_GroupByProcessWindow.scala:36)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anon.apply(AnyFunSuiteLike.scala:189)
    at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
    at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
    at org.scalatest.funsuite.AnyFunSuiteLike$class.invokeWithFixture(AnyFunSuiteLike.scala:186)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest.apply(AnyFunSuiteLike.scala:199)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest.apply(AnyFunSuiteLike.scala:199)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.funsuite.AnyFunSuiteLike$class.runTest(AnyFunSuiteLike.scala:199)
    at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests.apply(AnyFunSuiteLike.scala:232)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests.apply(AnyFunSuiteLike.scala:232)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
    at org.scalatest.funsuite.AnyFunSuiteLike$class.runTests(AnyFunSuiteLike.scala:232)
    at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
    at org.scalatest.Suite$class.run(Suite.scala:1112)
    at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run.apply(AnyFunSuiteLike.scala:236)
    at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run.apply(AnyFunSuiteLike.scala:236)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
    at org.scalatest.funsuite.AnyFunSuiteLike$class.run(AnyFunSuiteLike.scala:236)
    at org.scalatest.funsuite.AnyFunSuite.run(AnyFunSuite.scala:1562)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1320)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1314)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:972)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:971)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
    at org.scalatest.tools.Runner$.run(Runner.scala:798)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)

错误消息确实不清楚,尽管发生的事情是 java 语法和翻滚 window 表达式的 scala 语法之间的冲突。

这是 tumbling window 的 java 语法,scala 似乎不接受 API:

// this does not work in scala:
//  val result = table.window(Tumble.over(lit(4).second()).on($"pt").as("w"))

不知何故,当从 scala 调用时,“4”似乎结束了一次太多,无法转换为“4 秒”持续时间,

这个 scala 语法解决了它:

import org.apache.flink.table.api._

...
// this works in scala:
table.window(Tumble.over(4.second()).on($"pt").as("w"))

在这两种情况下,调用的是同一个 second() 函数,尽管在第二种情况下,它的参数具有预期的类型。

请注意,您也可以从这种语法中获得乐趣:

val result = table.window(Tumble over 4.second on $"pt" as "w")