无法将 KSQL 聚合 table 保存到 Postgres

Cannot persist KSQL aggregate table to Postgres

我正在尝试使用 JDBC 接收器连接器在我的 Postgres 数据库中镜像 KSQL table,但不幸的是我无法使其工作。

我正在使用 Kafka 5.4.1,并且我有 2 个 debezium 1.0 主题使用来自我的 Postgres 数据库的 Avro 序列化。这是我的 Debezium 连接器的配置:

    {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.dbname": "xxx",
        "tasks.max": "1",
        "database.history.kafka.bootstrap.servers": "kafka-svc:9092",
        "database.history.kafka.topic": "dbhistory.xxx",
        "database.server.name": "xxx",
        "database.port": "5432",
        "plugin.name": "decoderbufs",
        "table.whitelist": "public.a,public.b",
        "database.hostname": "app-db",
        "name": "connector",
        "connection.url": "jdbc:postgresql://app-db:5432/xxx",
        "database.whitelist": "xxx",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.add.source.fields": "table"
    }

然后我使用 KSQL CLI 与我的服务器交互并发出以下命令:

CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');

CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');

CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;

CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;

TLDR,我从 debezium 主题创建了 2 个流并将它们重新分区以使它们为 JOIN 做好准备。 然后,我将其中一个 (b_by_id) 变成 table,因为在这种情况下我不想使用窗口连接:

CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');

此时一切正常,我可以使用我的流和 tables 并加入并看到我的源数据库中的更改立即反映在我在 KSQL 中的流式查询中。 当我决定对我的数据执行一些聚合函数并将结果镜像到我的 Postgres 数据库(与源数据库相同)时,我的问题就出现了。为此,我创建了一个新的 KSQL table 作为 SELECT:

的结果
CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;

然后,我设置了一个 JDBC 接收器连接器,以使用以下配置将我的新 table 的 grouped_data 更新日志主题转储到我的数据库中:

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url": "jdbc:postgresql://app-db:5432/xxx",
    "insert.mode": "upsert",
    "auto.create": true,
    "auto.evolve": true,
    "topics": "grouped_data",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-svc:8081",
    "pk.mode": "record_value",
    "pk.fields": "x, y, z",
    "table.name.format" : "kafka_${topic}",
    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "max_date",
    "transforms.TimestampConverter.target.type": "Timestamp"
}

不幸的是,我的接收器 DB 上没有任何错误,也没有任何数据。连接器已正确创建和配置,即使我强制流式查询处理新消息,也没有数据传输到我的接收器数据库,甚至没有创建目标 table。 我尝试使用不同的名称和配置、pk.mode 的不同值等多次创建连接器,但我无法让它工作。为我上面的 table "b" 创建一个连接器工作得很好,所有数据都会立即传输。

以下是我尝试镜像到 postgres 的 KSQL table 的更多详细信息:

describe extended grouped_data;

Name                 : GROUPED_DATA
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

 Field              | Type                      
------------------------------------------------
 ROWTIME            | BIGINT           (system) 
 ROWKEY             | VARCHAR(STRING)  (system) 
 X                  | BIGINT                    
 Y                  | BIGINT                    
 Z                  | BIGINT                    
 MAX_DATE           | BIGINT                    
------------------------------------------------

谢谢!

您已将 Kafka Connect 配置为使用小写主题名称

"topics": "grouped_data",

但是根据您的 DESCRIBE 输出,table 正在写入的主题是大写的:

Kafka topic          : GROUPED_DATA (partitions: 1, replication: 1)

如果您仔细检查 Kafka Connect worker 日志,您会发现:

Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE} 

如果您给 Kafka Connect 一个不存在的主题,Kafka Connect 不会中止 - 因为这可能是您 想要 指定的主题,因为您随后将填充它。

所以,您可以修改您的 Kafka Connect worker 配置以使用大写主题名称,或者您可以重新定义您的 ksqlDB table 并包含 …WITH (KAFKA_TOPIC='grouped_data') 在 DDL 中。