"register" 是 Ksql 中的保留关键字吗?如果是,我如何 select 具有该名称的字段

Is "register" a reserved keyword in Ksql and if so how can I select a field with that name

我正在学习 Confluent 平台(Kafka、Ksql 等)。我正在使用 Debezium 和 Kafka Connect 将数据流式传输到 Kafka 主题中。我的数据库 table "log" 中的字段之一称为 "register",它是添加记录时的时间戳。

参考table日志结构(在源MySQL数据库中)如下:

CREATE TABLE `log` (
  `code` varchar(9) NOT NULL,
  `register` datetime NOT NULL,
  `entry` mediumtext NOT NULL,
  PRIMARY KEY (`code`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1

我正在使用以下按预期工作的配置将两个数据库中 "log" table 中的数据流式传输到单个 Kafka 主题中。

"transforms.topicRoute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.topicRoute.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"transforms.topicRoute.replacement": "merged.",

我正在尝试建立一个 KSQL 流,该流创建一个新密钥,该密钥是源数据库(来自 Debezium 生成的元数据)和日志 table 中的代码字段的串联,以及table 中的其余字段。这样做的目的是派生密钥在发送到接收器时是完全唯一的(当前连接到另一个 MySQL 包含单个日志 table 的数据库,其内容应该是两个源的合并副本数据库的日志 tables)

我正在尝试 运行 的查询是:

SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;

但是出现以下错误:

line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE', 'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE', 'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY', 'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER, QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Statement: SELECT source->db + '.' + after->code AS KeyValue, after->register, after->entry FROM MERGED_LOG LIMIT 1;
Caused by: line 1:59: mismatched input 'register' expecting {'INTEGER', 'DATE',
        'TIME', 'TIMESTAMP', 'INTERVAL', 'YEAR', 'MONTH', 'DAY', 'HOUR', 'MINUTE',
        'SECOND', 'ZONE', 'PARTITION', 'STRUCT', 'EXPLAIN', 'ANALYZE', 'TYPE', 'SHOW',
        'TABLES', 'COLUMNS', 'COLUMN', 'PARTITIONS', 'FUNCTIONS', 'FUNCTION', 'ARRAY',
        'MAP', 'SET', 'RESET', 'SESSION', 'IF', IDENTIFIER, DIGIT_IDENTIFIER,
        QUOTED_IDENTIFIER, BACKQUOTED_IDENTIFIER}
Caused by: org.antlr.v4.runtime.InputMismatchException

我看不到任何地方表明 "register" 是某种保留词。

有人可以帮忙吗?替代方案可以建议一种在转换过程中更改字段名称的方法,请记住我无法展平 Debezium 生成的消息,因为我需要能够访问源数据库名称

  1. 是的 REGISTER是一个保留字,你应该在你的DDL中避免它。您可以通过引用它来访问它,值得一试。

  2. 有一个用于删除字段的单一消息转换,但它不适用于嵌套数据。您可以尝试将 UnwrapFromEnvelope SMT 与一个 SMT 结合使用以重命名该字段。我没有尝试过此配置,但类似 ​​

    "transforms": "unwrap,renameField",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.renameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.renameField.renames": "register:notareservedword",