Flink Table API:GROUP BY in SQL 执行抛出 org.apache.flink.table.api.TableException
Flink Table API: GROUP BY in SQL Execution throws org.apache.flink.table.api.TableException
我有一个非常简化的用例:我想使用 Apache Flink (1.11) 从 Kafka 主题中读取数据(我们称之为 source_topic),计算其中的一个属性(称为 b),然后将结果写入另一个 Kafka 主题 (result_topic)。
到目前为止我有以下代码:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)`
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.2.jar")
source_ddl = """
CREATE TABLE source_table(
a STRING,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'node-1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
"""
sink_ddl = """
CREATE TABLE result_table(
b INT,
result BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'result_topic',
'properties.bootstrap.servers' = 'node-1:9092',
'format' = 'csv'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("INSERT INTO result_table SELECT b,COUNT(b) FROM source_table GROUP BY b")
t_env.execute("Kafka_Flink_job")
if __name__ == '__main__':
log_processing()
但是当我执行它时,出现以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.result_table' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[b], select=[b, COUNT(b) AS EXPR])
我可以使用简单的 SELECT
语句将数据写入 Kafka 主题。但是只要我添加 GROUP BY
子句,就会抛出上面的异常。我按照 Flink 的文档使用 Table API with SQL for Python: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#sql
非常感谢任何帮助,我是流处理和 Flink 的新手。谢谢!
使用 GROUP BY
子句将生成一个更新流,从 Flink 1.11 开始,Kafka 连接器不支持它。另一方面,当您使用没有任何聚合的简单 SELECT
语句时,结果流是仅追加的(这就是为什么您可以毫无问题地使用它的原因)。
Flink 1.12 即将发布,它包括一个新的 upsert Kafka 连接器(FLIP-149,如果你好奇的话)这将允许你在 PyFlink 中也执行这种类型的操作(即Python Table API).
我有一个非常简化的用例:我想使用 Apache Flink (1.11) 从 Kafka 主题中读取数据(我们称之为 source_topic),计算其中的一个属性(称为 b),然后将结果写入另一个 Kafka 主题 (result_topic)。
到目前为止我有以下代码:
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
def log_processing():
env = StreamExecutionEnvironment.get_execution_environment()
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=env_settings)`
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)
t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///opt/flink-1.11.2/lib/flink-sql-connector-kafka_2.11-1.11.2.jar")
source_ddl = """
CREATE TABLE source_table(
a STRING,
b INT
) WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = 'node-1:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true'
)
"""
sink_ddl = """
CREATE TABLE result_table(
b INT,
result BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'result_topic',
'properties.bootstrap.servers' = 'node-1:9092',
'format' = 'csv'
)
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
t_env.execute_sql("INSERT INTO result_table SELECT b,COUNT(b) FROM source_table GROUP BY b")
t_env.execute("Kafka_Flink_job")
if __name__ == '__main__':
log_processing()
但是当我执行它时,出现以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.result_table' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[b], select=[b, COUNT(b) AS EXPR])
我可以使用简单的 SELECT
语句将数据写入 Kafka 主题。但是只要我添加 GROUP BY
子句,就会抛出上面的异常。我按照 Flink 的文档使用 Table API with SQL for Python: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#sql
非常感谢任何帮助,我是流处理和 Flink 的新手。谢谢!
使用 GROUP BY
子句将生成一个更新流,从 Flink 1.11 开始,Kafka 连接器不支持它。另一方面,当您使用没有任何聚合的简单 SELECT
语句时,结果流是仅追加的(这就是为什么您可以毫无问题地使用它的原因)。
Flink 1.12 即将发布,它包括一个新的 upsert Kafka 连接器(FLIP-149,如果你好奇的话)这将允许你在 PyFlink 中也执行这种类型的操作(即Python Table API).