Flink Table API 以流方式连接表
Flink Table API join tables in a streaming mode
我在我的 flink 应用程序中注册了两个 jdbc table,我想加入它们并将结果转换为常规数据流。
但是当我加入 tables 时出现错误
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
代码
val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("id", DataTypes.STRING)
.column("poi_id", DataTypes.STRING)
.column("gateway_id", DataTypes.STRING)
.column("internal_status", DataTypes.STRING)
.build())
.build()
val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("asset_id", DataTypes.STRING)
.column("tag_id", DataTypes.STRING)
.column("role", DataTypes.STRING)
.build())
.build()
tableEnv.createTemporaryTable("asset", assetTableDescriptor)
tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)
val assetTable: Table = tableEnv.from(assetTableDescriptor)
.select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")
val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
.select($"owner_id", $"asset_id", $"tag_id", $"role")
val assetAssociationTable = assetTable
.leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
.select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")
val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
.toDataStream(assetAssociationTable, classOf[JdbcAssetState])
.flatMap(new JdbcAssetStateDataMapper)
在 BATCH 模式下,它运行良好,但我需要在 STREAMING 模式下 assetTableStream
与我的应用程序中的另一个流加入
根据我在 flink 文档中发现的内容,看起来我需要使用 Lookup Join
但不知道如何使用 table API(不是 SQL) .
有任何连接两个 jdbc table 并将其转换为数据流的小例子就太棒了
两个提示:
- 如果您有更新的动态 table,为反映这些更新而创建的流是 changelog(不是 append-only)。这意味着您必须使用
toChangelogStream
方法(而不是您示例中的 toDataStream
)
- 由于要维护的状态大小,您的连接有问题。如果可能,请考虑使用间隔连接 (https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tableapi/#interval-join)。
我在我的 flink 应用程序中注册了两个 jdbc table,我想加入它们并将结果转换为常规数据流。 但是当我加入 tables 时出现错误
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.Unregistered_DataStream_Sink_1' doesn't support consuming update and delete changes which is produced by node Join(joinType=[LeftOuterJoin], where=[((id = asset_id) AND (owner_id = owner_id0))], select=[owner_id, id, poi_id, gateway_id, owner_id0, asset_id, tag_id, role], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
代码
val assetTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("id", DataTypes.STRING)
.column("poi_id", DataTypes.STRING)
.column("gateway_id", DataTypes.STRING)
.column("internal_status", DataTypes.STRING)
.build())
.build()
val assetTagTableDescriptor = TableDescriptor.forConnector("jdbc")
.option(JdbcConnectorOptions.URL, s"jdbc:mysql://${memsqlConfig("host")}:${memsqlConfig("port")}/${memsqlConfig("dbname")}")
.option(JdbcConnectorOptions.USERNAME, memsqlConfig("user"))
.option(JdbcConnectorOptions.PASSWORD, memsqlConfig("pass"))
.option(JdbcConnectorOptions.TABLE_NAME, "asset_tag")
.schema(Schema.newBuilder()
.column("owner_id", DataTypes.STRING)
.column("asset_id", DataTypes.STRING)
.column("tag_id", DataTypes.STRING)
.column("role", DataTypes.STRING)
.build())
.build()
tableEnv.createTemporaryTable("asset", assetTableDescriptor)
tableEnv.createTemporaryTable("asset_tag", assetTagTableDescriptor)
val assetTable: Table = tableEnv.from(assetTableDescriptor)
.select($"owner_id" as "asset_owner_id", $"id", $"poi_id", $"gateway_id", $"internal_status")
val assetTagTable: Table = tableEnv.from(assetTagTableDescriptor)
.select($"owner_id", $"asset_id", $"tag_id", $"role")
val assetAssociationTable = assetTable
.leftOuterJoin(assetTagTable, $"id" === $"asset_id" and $"asset_owner_id" === $"owner_id")
.select($"asset_owner_id", $"id", $"poi_id", $"gateway_id", $"tag_id", $"role")
val assetTableStream: DataStream[AssetOperationKafkaMsg] = tableEnv
.toDataStream(assetAssociationTable, classOf[JdbcAssetState])
.flatMap(new JdbcAssetStateDataMapper)
在 BATCH 模式下,它运行良好,但我需要在 STREAMING 模式下 assetTableStream
与我的应用程序中的另一个流加入
根据我在 flink 文档中发现的内容,看起来我需要使用 Lookup Join
但不知道如何使用 table API(不是 SQL) .
有任何连接两个 jdbc table 并将其转换为数据流的小例子就太棒了
两个提示:
- 如果您有更新的动态 table,为反映这些更新而创建的流是 changelog(不是 append-only)。这意味着您必须使用
toChangelogStream
方法(而不是您示例中的toDataStream
) - 由于要维护的状态大小,您的连接有问题。如果可能,请考虑使用间隔连接 (https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/tableapi/#interval-join)。