运行 下面的 pyflink 代码时出现此错误
i'm getting this error when running the below pyflink code
这是使用 apache flink(pyflink) 从 kafka 源计算每个 ch[x] 平均值的代码
我想我已经导入了所有必要的库
我在 运行 代码
时收到此错误
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3),
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka'
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()
错误是我添加了 sql kafka 连接器 flink-sql-connector-kafka_2.11-1.14.4.jar 但似乎没有任何效果
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
... 13 more ```
你的程序有很多问题,例如
'connector' = 'kafka'
后缺少逗号,``t TIMESTAMP_LTZ(3),
和 'format' = 'json',
后多出逗号
- 应该使用
create_temporary_function
来注册 Python UDF 而不是 execute_sql
- 出现在
SELECT
子句中的字段顺序与接收器tableoutput
定义不一致
我做了如下修改:
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()
这是使用 apache flink(pyflink) 从 kafka 源计算每个 ch[x] 平均值的代码 我想我已经导入了所有必要的库
我在 运行 代码
时收到此错误from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3),
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka'
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()
错误是我添加了 sql kafka 连接器 flink-sql-connector-kafka_2.11-1.14.4.jar 但似乎没有任何效果
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
... 13 more ```
你的程序有很多问题,例如
'connector' = 'kafka'
后缺少逗号,``tTIMESTAMP_LTZ(3),
和'format' = 'json',
后多出逗号
- 应该使用
create_temporary_function
来注册 Python UDF 而不是execute_sql
- 出现在
SELECT
子句中的字段顺序与接收器tableoutput
定义不一致
我做了如下修改:
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()