Apache Flink:无法解析 select 子句中的字段

Apache Flink: Cannot resolve field in select clause

我开始尝试 Apache Flink,我正在尝试聚合从 kafka 主题中提取的一些值。这是我正在使用的代码:

public class EnvironmentMeasuresJob {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .build();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.executeSql("CREATE TABLE EnvironmentMeasures (" +
                "`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'," +
                "`area` STRING," +
                "`sensor` STRING," +
                "`co` DECIMAL(5, 2)," +
                "`pm1` DECIMAL(5, 2)," +
                "`pm25` DECIMAL(5, 2)," +
                "`pm10` DECIMAL(5, 2)," +
                "`temperature` DECIMAL(5, 2)," +
                "`pressure` DECIMAL(5, 2)," +
                "`humidity` DECIMAL(5, 2)," +
                "WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND" +
                ") WITH (" +
                "'connector' = 'kafka'," +
                "'topic' = 'seneca.environmentmeasures.raw'," +
                "'properties.bootstrap.servers' = 'localhost:9092'," +
                "'properties.group.id' = 'env-measures-job'," +
                "'scan.startup.mode' = 'earliest-offset'," +
                "'format' = 'json'," +
                "'json.fail-on-missing-field' = 'false'," +
                "'json.ignore-parse-errors' = 'false'" +
                ")");
        Table environmentMeasures = tEnv.from("EnvironmentMeasures");
        Table aggregatedEnvironmentMeasures = environmentMeasures
                .window(Slide.over(lit(20).seconds())
                        .every(lit(10).seconds())
                        .on($("timestamp"))
                        .as("w"))
                .groupBy($("sensor"), $("w"))
                .select(
                        $("w").end().as("timestamp"),
                        $("area"),
                        $("sensor"),
                        $("co").avg().as("averageCO"),
                        $("pm1").avg().as("averagePM1"),
                        $("pm25").avg().as("averagePM25"),
                        $("pm10").avg().as("averagePM10"),
                        $("temperature").avg().as("averageTemperature"),
                        $("pressure").avg().as("averagePressure"),
                        $("humidity").avg().as("averageHumidity")
                );
    }
}

但是当我尝试执行代码时出现以下异常:

Exception in thread "main" org.apache.flink.table.api.ValidationException: Cannot resolve field [area], input field list:[sensor, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR, EXPR[=12=]]. 

如果我从 select 中删除“区域”,一切正常。知道这是怎么发生的吗?我错过了什么吗? 谢谢 尤克斯

我认为您需要按区域分组或计算给定传感器和 window 中包含的区域的一些聚合。