运行 下面的 pyflink 代码时出现此错误

i'm getting this error when running the below pyflink code

这是使用 apache flink(pyflink) 从 kafka 源计算每个 ch[x] 平均值的代码 我想我已经导入了所有必要的库

我在 运行 代码

时收到此错误
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3),
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json',
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500
def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka'
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()

错误是我添加了 sql kafka 连接器 flink-sql-connector-kafka_2.11-1.14.4.jar 但似乎没有任何效果


Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
        at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
        ... 13 more ```

你的程序有很多问题,例如

  • 'connector' = 'kafka' 后缺少逗号,``t TIMESTAMP_LTZ(3),'format' = 'json',
  • 后多出逗号
  • 应该使用 create_temporary_function 来注册 Python UDF 而不是 execute_sql
  • 出现在SELECT子句中的字段顺序与接收器tableoutput定义不一致

我做了如下修改:

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json'
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2

@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500

def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()