在 Flink 程序之间共享动态表

Share dynamic tables between Flink programs

我有一个 Flink 作业,它从数据库变更日志流创建动态 table。 table 定义如下所示:

tableEnv.sqlUpdate("""
      CREATE TABLE some_table_name (
          id INTEGER,
          name STRING,
          created_at BIGINT,
          updated_at BIGINT
      )
      WITH (
          'connector' = 'kafka',
          'topic' = 'topic',
          'properties.bootstrap.servers' = 'localhost:9092',
          'properties.zookeeper.connect' = 'localhost:2181',
          'properties.group.id' = 'group_1',
          'format' = 'debezium-json',
          'debezium-json.schema-include' = 'true'
      )
    """)

当试图在同一集群上的另一个 运行 Flink 应用程序中引用 table 时,我的程序 returns 出现错误:SqlValidatorException: Object 'some_table_name' not found。是否可以以某种方式注册 table 以便其他程序可以使用它?例如在这样的语句中:

  tableEnv.sqlQuery("""
    SELECT count(*) FROM some_table_name
  """).execute().print()

请注意,Flink 中的 table 不包含任何数据。例如,另一个 Flink 应用程序可以独立创建另一个 table 由相同的 Kafka 主题支持。因此,不在应用程序之间共享 table 并不像您预期​​的那样悲惨。

但您可以通过将 table 存储在外部 目录 中来共享它们。例如,您可以为此目的使用 Apache Hive 目录。有关详细信息,请参阅 the docs