org.apache.flink.api.table.TableException: "Alias on field reference expression expected"
org.apache.flink.api.table.TableException: "Alias on field reference expression expected"
我在 1.1-SNAPSHOT 版本上使用 Apache Flink 的 Table API 来评估流上的 SQL 查询。
以下是我的代码:
private static final int MAX_RACK_ID = 10;
private static final long PAUSE = 100;
private static final double TEMP_STD = 20;
private static final double TEMP_MEAN = 80;
public static void main(String[] args)
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv=TableEnvironment.getTableEnvironment(env);
DataStream<MonitoringEvent> dstream = env.addSource(new MonitoringEventSource(MAX_RACK_ID, PAUSE, TEMP_STD, TEMP_MEAN));
tableEnv.registerDataStream("TemperatureData", dstream,"rackid,temperature,timestamp");
Table tab1 = tableEnv.sql("select STREAM rackid,temperature,timestamp from TemperatureData where temperature>=100");
DataStream<TemperatureEvent>tempstream=tableEnv.toDataStream(tab1, TemperatureEvent.class);
tempstream.print();
}
当我执行这个程序时,它抛出以下异常:
Exception in thread "main" org.apache.flink.api.table.TableException: Alias on field reference expression expected.
at org.apache.flink.api.table.TableEnvironment$$anonfun.apply(TableEnvironment.scala:299)
at org.apache.flink.api.table.TableEnvironment$$anonfun.apply(TableEnvironment.scala:292)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.flink.api.table.TableEnvironment.getFieldInfo(TableEnvironment.scala:292)
at org.apache.flink.api.table.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:212)
at org.apache.flink.api.java.table.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:130)
at com.yash.flink.Program.main(Program.java:31)
我有几个问题:
- 使用 Apache Flink 的 Table API 在流上编写 SQL 查询的方法是什么?
- 如何在 Flink 中实现这个查询?
- 这是 Flink Table API 的 bug ??
您在 Table API 中发现了限制/错误。问题是由 DataStream<MonitoringEvent>
注册为 table 引起的。你应该做
tableEnv.registerDataStream(
"TemperatureData",
dstream,
"rackid AS rackid, temperature AS temperature, timestamp AS timestamp"
);
让它发挥作用。我会确保在 Stream SQL 在 Flink 1.1.0.
中发布之前解决问题
我在 1.1-SNAPSHOT 版本上使用 Apache Flink 的 Table API 来评估流上的 SQL 查询。
以下是我的代码:
private static final int MAX_RACK_ID = 10;
private static final long PAUSE = 100;
private static final double TEMP_STD = 20;
private static final double TEMP_MEAN = 80;
public static void main(String[] args)
{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnv=TableEnvironment.getTableEnvironment(env);
DataStream<MonitoringEvent> dstream = env.addSource(new MonitoringEventSource(MAX_RACK_ID, PAUSE, TEMP_STD, TEMP_MEAN));
tableEnv.registerDataStream("TemperatureData", dstream,"rackid,temperature,timestamp");
Table tab1 = tableEnv.sql("select STREAM rackid,temperature,timestamp from TemperatureData where temperature>=100");
DataStream<TemperatureEvent>tempstream=tableEnv.toDataStream(tab1, TemperatureEvent.class);
tempstream.print();
}
当我执行这个程序时,它抛出以下异常:
Exception in thread "main" org.apache.flink.api.table.TableException: Alias on field reference expression expected.
at org.apache.flink.api.table.TableEnvironment$$anonfun.apply(TableEnvironment.scala:299)
at org.apache.flink.api.table.TableEnvironment$$anonfun.apply(TableEnvironment.scala:292)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.flink.api.table.TableEnvironment.getFieldInfo(TableEnvironment.scala:292)
at org.apache.flink.api.table.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:212)
at org.apache.flink.api.java.table.StreamTableEnvironment.registerDataStream(StreamTableEnvironment.scala:130)
at com.yash.flink.Program.main(Program.java:31)
我有几个问题:
- 使用 Apache Flink 的 Table API 在流上编写 SQL 查询的方法是什么?
- 如何在 Flink 中实现这个查询?
- 这是 Flink Table API 的 bug ??
您在 Table API 中发现了限制/错误。问题是由 DataStream<MonitoringEvent>
注册为 table 引起的。你应该做
tableEnv.registerDataStream(
"TemperatureData",
dstream,
"rackid AS rackid, temperature AS temperature, timestamp AS timestamp"
);
让它发挥作用。我会确保在 Stream SQL 在 Flink 1.1.0.
中发布之前解决问题