Flink Table API 以流方式连接表

Flink Table API join tables in a streaming mode

我在我的 flink 应用程序中注册了两个 jdbc table,我想加入它们并将结果转换为常规数据流。 但是当我加入 tables 时出现错误

Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])

代码

    val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("id", DataTypes.STRING)
        .column("poi_id", DataTypes.STRING)
        .column("gateway_id", DataTypes.STRING)
        .column("internal_status", DataTypes.STRING)
        .build())
      .build()

    val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
      .option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
      .option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
      .option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
      .option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
      .schema(Schema.newBuilder()
        .column("owner_id", DataTypes.STRING)
        .column("asset_id", DataTypes.STRING)
        .column("tag_id", DataTypes.STRING)
        .column("role", DataTypes.STRING)
        .build())
      .build()

    tableEnv.createTemporaryTable("asset", assetTableDescriptor)
    tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)


    val assetTable: Table = tableEnv.from(assetTableDescriptor)
      .select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")

    val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
      .select($"owner_id", $"asset_id", $"tag_id", $"role")

    val assetAssociationTable = assetTable
      .leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
      .select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")

    val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
      .toDataStream(assetAssociationTable, classOf[JdbcAssetState])
      .flatMap(new JdbcAssetStateDataMapper)

在 BATCH 模式下,它运行良好,但我需要在 STREAMING 模式下 assetTableStream 与我的应用程序中的另一个流加入

根据我在 flink 文档中发现的内容,看起来我需要使用 Lookup Join 但不知道如何使用 table API(不是 SQL) .

有任何连接两个 jdbc table 并将其转换为数据流的小例子就太棒了

两个提示: