Apache Flink:无法解析 select 子句中的字段
Apache Flink: Cannot resolve field in select clause
我开始尝试 Apache Flink,我正在尝试聚合从 kafka 主题中提取的一些值。这是我正在使用的代码:
public class EnvironmentMeasuresJob {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE EnvironmentMeasures (" +
"`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," +
"`area` STRING," +
"`sensor` STRING," +
"`co` DECIMAL(5, 2)," +
"`pm1` DECIMAL(5, 2)," +
"`pm25` DECIMAL(5, 2)," +
"`pm10` DECIMAL(5, 2)," +
"`temperature` DECIMAL(5, 2)," +
"`pressure` DECIMAL(5, 2)," +
"`humidity` DECIMAL(5, 2)," +
"WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.environmentmeasures.raw'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'env-measures-job'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json'," +
"'json.fail-on-missing-field' = 'false'," +
"'json.ignore-parse-errors' = 'false'" +
")");
Table environmentMeasures = tEnv.from("EnvironmentMeasures");
Table aggregatedEnvironmentMeasures = environmentMeasures
.window(Slide.over(lit(20).seconds())
.every(lit(10).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("sensor"), $("w"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("sensor"),
$("co").avg().as("averageCO"),
$("pm1").avg().as("averagePM1"),
$("pm25").avg().as("averagePM25"),
$("pm10").avg().as("averagePM10"),
$("temperature").avg().as("averageTemperature"),
$("pressure").avg().as("averagePressure"),
$("humidity").avg().as("averageHumidity")
);
}
}
但是当我尝试执行代码时出现以下异常:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [area], input field list:[sensor, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR[=12=]].
如果我从 select 中删除“区域”,一切正常。知道这是怎么发生的吗?我错过了什么吗?
谢谢
尤克斯
我认为您需要按区域分组或计算给定传感器和 window 中包含的区域的一些聚合。
我开始尝试 Apache Flink,我正在尝试聚合从 kafka 主题中提取的一些值。这是我正在使用的代码:
public class EnvironmentMeasuresJob {
public static void main(String[] args) {
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE EnvironmentMeasures (" +
"`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," +
"`area` STRING," +
"`sensor` STRING," +
"`co` DECIMAL(5, 2)," +
"`pm1` DECIMAL(5, 2)," +
"`pm25` DECIMAL(5, 2)," +
"`pm10` DECIMAL(5, 2)," +
"`temperature` DECIMAL(5, 2)," +
"`pressure` DECIMAL(5, 2)," +
"`humidity` DECIMAL(5, 2)," +
"WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'seneca.environmentmeasures.raw'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'env-measures-job'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json'," +
"'json.fail-on-missing-field' = 'false'," +
"'json.ignore-parse-errors' = 'false'" +
")");
Table environmentMeasures = tEnv.from("EnvironmentMeasures");
Table aggregatedEnvironmentMeasures = environmentMeasures
.window(Slide.over(lit(20).seconds())
.every(lit(10).seconds())
.on($("timestamp"))
.as("w"))
.groupBy($("sensor"), $("w"))
.select(
$("w").end().as("timestamp"),
$("area"),
$("sensor"),
$("co").avg().as("averageCO"),
$("pm1").avg().as("averagePM1"),
$("pm25").avg().as("averagePM25"),
$("pm10").avg().as("averagePM10"),
$("temperature").avg().as("averageTemperature"),
$("pressure").avg().as("averagePressure"),
$("humidity").avg().as("averageHumidity")
);
}
}
但是当我尝试执行代码时出现以下异常:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [area], input field list:[sensor, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR[=12=]].
如果我从 select 中删除“区域”,一切正常。知道这是怎么发生的吗?我错过了什么吗? 谢谢 尤克斯
我认为您需要按区域分组或计算给定传感器和 window 中包含的区域的一些聚合。