在 Dataset<Row> 对象中转换 Flink CEP table 时出错
Error converting Flink CEP table in Dataset<Row> object
我在 Apache Flink
中创建了一个 BatchTableEnviroment
并创建了一个 Table
对象,我在其中加载了数据。现在我想搜索一些模式。我正在使用 Detecting patterns in Tables CEP library
执行此操作。下面查询的任务是找到 mGroup
中 avgResult
未低于特定阈值的最长时间段。其中 mGroup
是 Integer
值,例如 100、200、300 等。Avgresult
是双精度值。当我编译查询部分时,我没有收到任何错误。当我将 Table
转换为 DataSet<row>
时出现错误。在查询下方,您可以看到错误消息。
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);
Table trendTable = tableEnv.sqlQuery(
" SELECT * " +
" FROM tableAvg " +
" MATCH_RECOGNIZE(" +
" PARTITION BY mType " +
" ORDER BY mGroup " +
" MEASURES " +
" FIRST(A.mGroup) as startGr, " +
" LAST(A.mGroup) as endGr, " +
" A.avgResult as avgRes" +
" ONE ROW PER MATCH " +
" AFTER MATCH SKIP PAST LAST ROW " +
" PATTERN (A+ B) " +
" DEFINE " +
" A AS A.avgResult < 50 " +
") "
);
tableEnv.registerTable("TrendTable", trendTable);
DataSet<Row> result = tableEnv.toDataSet(trendTable, Row.class);
/////////////////////ERROR MESSAGE BELOW
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalMatch(partition=[[]], order=[[2]], outputFields=[[mType, startGr, endGr, avgRes]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[<(PREV(A.[=12=], 0), 50)]], inputFields=[[sumResult, mType, EXPR]])
FlinkLogicalSort(sort0=[], dir0=[ASC])
FlinkLogicalCalc(expr#0..5=[{inputs}], expr#6=[/($t2, $t3)], expr#7=[99], expr#8=[>($t5, $t7)], sumResult=[$t6], mType=[$t1], EXPR=[$t4], $condition=[$t8])
FlinkLogicalAggregate(group=[{0, 1}], agg#0=[SUM()], agg#1=[SUM()], agg#2=[MAX()], agg#3=[COUNT()])
FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[-($t0, $t3)], expr#5=[100], expr#6=[/($t4, $t5)], expr#7=[1.0:DECIMAL(2, 1)], $f0=[$t6], mType=[$t1], mValue=[$t2], $f3=[$t7], mID=[$t0])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, H]], fields=[mID, dateTime, mValue, unixDateTime, mType], source=[CsvTableSource(read fields: mID, mType, mValue)])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
at org.apache.flink.table.plan.Optimizer.optimizePhysicalPlan(Optimizer.scala:170)
at org.apache.flink.table.plan.BatchOptimizer.optimize(BatchOptimizer.scala:57)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:280)
at org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:71)
at StreamTableEnv.main(StreamTableEnv.java:169)
CEP 库和 MATCH_RECOGNIZE 仅在流 API 之上工作(而不是批处理),这意味着您需要使用 StreamTableEnvironment
而不是 BatchTableEnviroment
.
我在 Apache Flink
中创建了一个 BatchTableEnviroment
并创建了一个 Table
对象,我在其中加载了数据。现在我想搜索一些模式。我正在使用 Detecting patterns in Tables CEP library
执行此操作。下面查询的任务是找到 mGroup
中 avgResult
未低于特定阈值的最长时间段。其中 mGroup
是 Integer
值,例如 100、200、300 等。Avgresult
是双精度值。当我编译查询部分时,我没有收到任何错误。当我将 Table
转换为 DataSet<row>
时出现错误。在查询下方,您可以看到错误消息。
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(fbEnv);
Table trendTable = tableEnv.sqlQuery(
" SELECT * " +
" FROM tableAvg " +
" MATCH_RECOGNIZE(" +
" PARTITION BY mType " +
" ORDER BY mGroup " +
" MEASURES " +
" FIRST(A.mGroup) as startGr, " +
" LAST(A.mGroup) as endGr, " +
" A.avgResult as avgRes" +
" ONE ROW PER MATCH " +
" AFTER MATCH SKIP PAST LAST ROW " +
" PATTERN (A+ B) " +
" DEFINE " +
" A AS A.avgResult < 50 " +
") "
);
tableEnv.registerTable("TrendTable", trendTable);
DataSet<Row> result = tableEnv.toDataSet(trendTable, Row.class);
/////////////////////ERROR MESSAGE BELOW
Exception in thread "main" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:
FlinkLogicalMatch(partition=[[]], order=[[2]], outputFields=[[mType, startGr, endGr, avgRes]], allRows=[false], after=[FLAG(SKIP PAST LAST ROW)], pattern=[(PATTERN_QUANTIFIER(_UTF-16LE'A', 1, -1, false), _UTF-16LE'B')], isStrictStarts=[false], isStrictEnds=[false], subsets=[[]], patternDefinitions=[[<(PREV(A.[=12=], 0), 50)]], inputFields=[[sumResult, mType, EXPR]])
FlinkLogicalSort(sort0=[], dir0=[ASC])
FlinkLogicalCalc(expr#0..5=[{inputs}], expr#6=[/($t2, $t3)], expr#7=[99], expr#8=[>($t5, $t7)], sumResult=[$t6], mType=[$t1], EXPR=[$t4], $condition=[$t8])
FlinkLogicalAggregate(group=[{0, 1}], agg#0=[SUM()], agg#1=[SUM()], agg#2=[MAX()], agg#3=[COUNT()])
FlinkLogicalCalc(expr#0..2=[{inputs}], expr#3=[1], expr#4=[-($t0, $t3)], expr#5=[100], expr#6=[/($t4, $t5)], expr#7=[1.0:DECIMAL(2, 1)], $f0=[$t6], mType=[$t1], mValue=[$t2], $f3=[$t7], mID=[$t0])
FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, H]], fields=[mID, dateTime, mValue, unixDateTime, mType], source=[CsvTableSource(read fields: mID, mType, mValue)])
This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.plan.Optimizer.runVolcanoPlanner(Optimizer.scala:245)
at org.apache.flink.table.plan.Optimizer.optimizePhysicalPlan(Optimizer.scala:170)
at org.apache.flink.table.plan.BatchOptimizer.optimize(BatchOptimizer.scala:57)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:280)
at org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl.toDataSet(BatchTableEnvironmentImpl.scala:71)
at StreamTableEnv.main(StreamTableEnv.java:169)
CEP 库和 MATCH_RECOGNIZE 仅在流 API 之上工作(而不是批处理),这意味着您需要使用 StreamTableEnvironment
而不是 BatchTableEnviroment
.