在 Flink 程序之间共享动态表
Share dynamic tables between Flink programs
我有一个 Flink 作业,它从数据库变更日志流创建动态 table。 table 定义如下所示:
tableEnv.sqlUpdate("""
CREATE TABLE some_table_name (
id INTEGER,
name STRING,
created_at BIGINT,
updated_at BIGINT
)
WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.zookeeper.connect' = 'localhost:2181',
'properties.group.id' = 'group_1',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
)
""")
当试图在同一集群上的另一个 运行 Flink 应用程序中引用 table 时,我的程序 returns 出现错误:SqlValidatorException: Object 'some_table_name' not found
。是否可以以某种方式注册 table 以便其他程序可以使用它?例如在这样的语句中:
tableEnv.sqlQuery("""
SELECT count(*) FROM some_table_name
""").execute().print()
请注意,Flink 中的 table 不包含任何数据。例如,另一个 Flink 应用程序可以独立创建另一个 table 由相同的 Kafka 主题支持。因此,不在应用程序之间共享 table 并不像您预期的那样悲惨。
但您可以通过将 table 存储在外部 目录 中来共享它们。例如,您可以为此目的使用 Apache Hive 目录。有关详细信息,请参阅 the docs。
我有一个 Flink 作业,它从数据库变更日志流创建动态 table。 table 定义如下所示:
tableEnv.sqlUpdate("""
CREATE TABLE some_table_name (
id INTEGER,
name STRING,
created_at BIGINT,
updated_at BIGINT
)
WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.zookeeper.connect' = 'localhost:2181',
'properties.group.id' = 'group_1',
'format' = 'debezium-json',
'debezium-json.schema-include' = 'true'
)
""")
当试图在同一集群上的另一个 运行 Flink 应用程序中引用 table 时,我的程序 returns 出现错误:SqlValidatorException: Object 'some_table_name' not found
。是否可以以某种方式注册 table 以便其他程序可以使用它?例如在这样的语句中:
tableEnv.sqlQuery("""
SELECT count(*) FROM some_table_name
""").execute().print()
请注意,Flink 中的 table 不包含任何数据。例如,另一个 Flink 应用程序可以独立创建另一个 table 由相同的 Kafka 主题支持。因此,不在应用程序之间共享 table 并不像您预期的那样悲惨。
但您可以通过将 table 存储在外部 目录 中来共享它们。例如,您可以为此目的使用 Apache Hive 目录。有关详细信息,请参阅 the docs。