使用 JDBC SInk 连接器处理要在 MySQL 中复制的 ksqlDB v0.11 复合键(表)

Handling ksqlDB v0.11 composite key (tables) to replicate in MySQL using JDBC SInk connector

我正在使用 ksqlDB 版本 0.11(目前我无法升级到更新的版本),并且愿意使用 JDBC 接收器连接器将 TABLE 数据复制到 MySQL。 ksqlDB v0.11 不支持多个 TABLE 键,我的数据需要使用多个 GROUP BY 表达式进行分组。

使用这条语句我创建了 table:

    CREATE TABLE estads AS SELECT 
                STID AS stid, 
                ASIG AS asig, 
                COUNT(*) AS np, 
                MIN(NOTA) AS min, 
                MAX(NOTA) AS max, 
                AVG(NOTA) AS med, 
                LATEST_BY_OFFSET(FECHREG) AS fechreg 
        FROM estads_stm GROUP BY stid, asig EMIT CHANGES;

生成的 table 具有以下架构:

Name                 : ESTADS
 Field      | Type                           
---------------------------------------------
 KSQL_COL_0 | VARCHAR(STRING)  (primary key) 
 NP         | BIGINT                         
 MIN        | DOUBLE                         
 MAX        | DOUBLE                         
 MED        | DOUBLE                         
 FECHREG    | VARCHAR(STRING) 

如您所见,两个主键(stid 和 asig)已合并到一个名为 KSQL_COL_0 的字段中,这是 0.11 版的预期行为。问题是我需要使用 JDBC 接收器连接器将数据复制到具有以下架构的 MySQL table 中:

+---------+--------------+------+-----+-------------------+-----------------------------+
| Field   | Type         | Null | Key | Default           | Extra                       |
+---------+--------------+------+-----+-------------------+-----------------------------+
| stid    | varchar(15)  | NO   | PRI | NULL              |                             |
| asig    | varchar(10)  | NO   | PRI | NULL              |                             |
| np      | smallint(6)  | YES  |     | NULL              |                             |
| min     | decimal(5,2) | YES  |     | NULL              |                             |
| max     | decimal(5,2) | YES  |     | NULL              |                             |
| med     | decimal(5,2) | YES  |     | NULL              |                             |
| fechreg | timestamp    | NO   |     | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------+--------------+------+-----+-------------------+-----------------------------+

我不知道如何“取消合并”自动生成的 KSQL_COL_0 以告诉 JDBC stid 和 asig 都是 MySQL [=37 中的主键=].任何想法如何管理这个?我知道自从 ksqlDB 版本 0.15 这不再是问题,因为 ksqlDB tables 支持多个键,但正如我所说,升级不是我的选择。

谢谢!

我想通了。

基本上您需要在 table 创建查询中使用 AS_VALUE() 子句。通过这种方式,您可以在新列中复制两个私钥的值,同时在其自己的列中也有新创建的私钥。然后,只需在JCBD Sink Connector中指定获取除新建私钥外的所有列的值。

CREATE TABLE estads AS SELECT 
                STID AS k1, 
                ASIG AS k2, 
                AS_VALUE(STID) AS stid,
                AS_VALUE(ASIG) AS asig,
                COUNT(*) AS np, 
                MIN(NOTA) AS min, 
                MAX(NOTA) AS max, 
                AVG(NOTA) AS med, 
                LATEST_BY_OFFSET(FECHREG) AS fechreg 
        FROM estads_stm GROUP BY k1, k2 EMIT CHANGES;