白天时间间隔的无效常量:org.apache.flink.table.api.ApiExpression
Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression
我有以下简单代码执行基于处理时间的翻转 window,使用 table api,但是当我 运行 它时抛出异常。我不知道它在说什么,有人可以帮忙看看吗?谢谢!
Stock案例class定义如下:
case class Stock(id: String, trade_date: Timestamp, price: Double)
申请代码为:
package org.example.sqlcookbook
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{AnyWithOperations, FieldExpression, Tumble, lit}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.example.sources.StockSource
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds = env.addSource(new StockSource(emitInterval = 1500, print = false))
val tenv = StreamTableEnvironment.create(env)
val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"pt".proctime())
val result = table.window(Tumble.over(lit(4).second()).on($"pt").as("w"))
.groupBy($"id", $"w")
.select(
$"id",
$"w".start().as("w_start"),
$"w".`end`().as("w_end"),
$"price".sum().as("sum_price")
)
result.toAppendStream[Row].print()
env.execute()
StockSource 是:
package org.example.sources
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.example.model.Stock
import org.example.utils.Implicits._
import scala.util.Try
class StockSource(emitInterval: Int = 0, print: Boolean = false) extends RichSourceFunction[Stock] {
val fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val running = true
override def run(sc: SourceContext[Stock]): Unit = {
//Stock Collection
Stock.stocks.foreach {
stock =>
sc.collect(stock)
if (print) {
System.out.println("Source-" + stock.trade_date.to_str + "-" + stock.price)
}
if (emitInterval > 0) {
Thread.sleep(emitInterval)
}
}
//Keep Running
while (true) {
Try {
Thread.sleep(Long.MaxValue)
}
}
}
override def cancel(): Unit = {
}
}
Stock 对象是:
object Stock {
val stocks = Seq(
Stock("id1", "2020-09-16 20:50:15".ts, 1),
Stock("id1", "2020-09-16 20:50:12".ts, 2),
Stock("id1", "2020-09-16 20:50:11".ts, 3),
Stock("id1", "2020-09-16 20:50:18".ts, 4),
Stock("id1", "2020-09-16 20:50:13".ts, 5),
Stock("id1", "2020-09-16 20:50:20".ts, 6),
Stock("id1", "2020-09-16 20:50:14".ts, 7),
Stock("id1", "2020-09-16 20:50:22".ts, 8),
Stock("id1", "2020-09-16 20:50:40".ts, 9)
)
}
异常信息为:
Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression@731692
org.apache.flink.table.api.ValidationException: Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression@731692
at org.apache.flink.table.expressions.ApiExpressionUtils.lambda$toMilliInterval(ApiExpressionUtils.java:315)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval(ApiExpressionUtils.java:315)
at org.apache.flink.table.api.internal.BaseExpressions.second(BaseExpressions.java:1193)
at org.example.sqlcookbook.T006_GroupByProcessWindow$$anonfun.apply(T006_GroupByProcessWindow.scala:42)
at org.example.sqlcookbook.T006_GroupByProcessWindow$$anonfun.apply(T006_GroupByProcessWindow.scala:36)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon.apply(AnyFunSuiteLike.scala:189)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$class.invokeWithFixture(AnyFunSuiteLike.scala:186)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest.apply(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest.apply(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike$class.runTest(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests.apply(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests.apply(AnyFunSuiteLike.scala:232)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike$class.runTests(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
at org.scalatest.Suite$class.run(Suite.scala:1112)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run.apply(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run.apply(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike$class.run(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuite.run(AnyFunSuite.scala:1562)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1320)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:972)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
错误消息确实不清楚,尽管发生的事情是 java 语法和翻滚 window 表达式的 scala 语法之间的冲突。
这是 tumbling window 的 java 语法,scala 似乎不接受 API:
// this does not work in scala:
// val result = table.window(Tumble.over(lit(4).second()).on($"pt").as("w"))
不知何故,当从 scala 调用时,“4”似乎结束了一次太多,无法转换为“4 秒”持续时间,
这个 scala 语法解决了它:
import org.apache.flink.table.api._
...
// this works in scala:
table.window(Tumble.over(4.second()).on($"pt").as("w"))
在这两种情况下,调用的是同一个 second()
函数,尽管在第二种情况下,它的参数具有预期的类型。
请注意,您也可以从这种语法中获得乐趣:
val result = table.window(Tumble over 4.second on $"pt" as "w")
我有以下简单代码执行基于处理时间的翻转 window,使用 table api,但是当我 运行 它时抛出异常。我不知道它在说什么,有人可以帮忙看看吗?谢谢!
Stock案例class定义如下:
case class Stock(id: String, trade_date: Timestamp, price: Double)
申请代码为:
package org.example.sqlcookbook
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{AnyWithOperations, FieldExpression, Tumble, lit}
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
import org.example.sources.StockSource
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val ds = env.addSource(new StockSource(emitInterval = 1500, print = false))
val tenv = StreamTableEnvironment.create(env)
val table = tenv.fromDataStream(ds, $"id", $"trade_date", $"price", $"pt".proctime())
val result = table.window(Tumble.over(lit(4).second()).on($"pt").as("w"))
.groupBy($"id", $"w")
.select(
$"id",
$"w".start().as("w_start"),
$"w".`end`().as("w_end"),
$"price".sum().as("sum_price")
)
result.toAppendStream[Row].print()
env.execute()
StockSource 是:
package org.example.sources
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.example.model.Stock
import org.example.utils.Implicits._
import scala.util.Try
class StockSource(emitInterval: Int = 0, print: Boolean = false) extends RichSourceFunction[Stock] {
val fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val running = true
override def run(sc: SourceContext[Stock]): Unit = {
//Stock Collection
Stock.stocks.foreach {
stock =>
sc.collect(stock)
if (print) {
System.out.println("Source-" + stock.trade_date.to_str + "-" + stock.price)
}
if (emitInterval > 0) {
Thread.sleep(emitInterval)
}
}
//Keep Running
while (true) {
Try {
Thread.sleep(Long.MaxValue)
}
}
}
override def cancel(): Unit = {
}
}
Stock 对象是:
object Stock {
val stocks = Seq(
Stock("id1", "2020-09-16 20:50:15".ts, 1),
Stock("id1", "2020-09-16 20:50:12".ts, 2),
Stock("id1", "2020-09-16 20:50:11".ts, 3),
Stock("id1", "2020-09-16 20:50:18".ts, 4),
Stock("id1", "2020-09-16 20:50:13".ts, 5),
Stock("id1", "2020-09-16 20:50:20".ts, 6),
Stock("id1", "2020-09-16 20:50:14".ts, 7),
Stock("id1", "2020-09-16 20:50:22".ts, 8),
Stock("id1", "2020-09-16 20:50:40".ts, 9)
)
}
异常信息为:
Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression@731692
org.apache.flink.table.api.ValidationException: Invalid constant for day-time interval: org.apache.flink.table.api.ApiExpression@731692
at org.apache.flink.table.expressions.ApiExpressionUtils.lambda$toMilliInterval(ApiExpressionUtils.java:315)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.expressions.ApiExpressionUtils.toMilliInterval(ApiExpressionUtils.java:315)
at org.apache.flink.table.api.internal.BaseExpressions.second(BaseExpressions.java:1193)
at org.example.sqlcookbook.T006_GroupByProcessWindow$$anonfun.apply(T006_GroupByProcessWindow.scala:42)
at org.example.sqlcookbook.T006_GroupByProcessWindow$$anonfun.apply(T006_GroupByProcessWindow.scala:36)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.funsuite.AnyFunSuiteLike$$anon.apply(AnyFunSuiteLike.scala:189)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.funsuite.AnyFunSuite.withFixture(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$class.invokeWithFixture(AnyFunSuiteLike.scala:186)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest.apply(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTest.apply(AnyFunSuiteLike.scala:199)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.funsuite.AnyFunSuiteLike$class.runTest(AnyFunSuiteLike.scala:199)
at org.scalatest.funsuite.AnyFunSuite.runTest(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests.apply(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$runTests.apply(AnyFunSuiteLike.scala:232)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:413)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes.apply(Engine.scala:401)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes(Engine.scala:401)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.funsuite.AnyFunSuiteLike$class.runTests(AnyFunSuiteLike.scala:232)
at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
at org.scalatest.Suite$class.run(Suite.scala:1112)
at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run.apply(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuiteLike$$anonfun$run.apply(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.funsuite.AnyFunSuiteLike$class.run(AnyFunSuiteLike.scala:236)
at org.scalatest.funsuite.AnyFunSuite.run(AnyFunSuite.scala:1562)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1320)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun.apply(Runner.scala:1314)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:972)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter.apply(Runner.scala:971)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:40)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:27)
错误消息确实不清楚,尽管发生的事情是 java 语法和翻滚 window 表达式的 scala 语法之间的冲突。
这是 tumbling window 的 java 语法,scala 似乎不接受 API:
// this does not work in scala:
// val result = table.window(Tumble.over(lit(4).second()).on($"pt").as("w"))
不知何故,当从 scala 调用时,“4”似乎结束了一次太多,无法转换为“4 秒”持续时间,
这个 scala 语法解决了它:
import org.apache.flink.table.api._
...
// this works in scala:
table.window(Tumble.over(4.second()).on($"pt").as("w"))
在这两种情况下,调用的是同一个 second()
函数,尽管在第二种情况下,它的参数具有预期的类型。
请注意,您也可以从这种语法中获得乐趣:
val result = table.window(Tumble over 4.second on $"pt" as "w")