无法将 KSQL 聚合 table 保存到 Postgres
Cannot persist KSQL aggregate table to Postgres
我正在尝试使用 JDBC 接收器连接器在我的 Postgres 数据库中镜像 KSQL table,但不幸的是我无法使其工作。
我正在使用 Kafka 5.4.1,并且我有 2 个 debezium 1.0 主题使用来自我的 Postgres 数据库的 Avro 序列化。这是我的 Debezium 连接器的配置:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "xxx",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka-svc:9092",
"database.history.kafka.topic": "dbhistory.xxx",
"database.server.name": "xxx",
"database.port": "5432",
"plugin.name": "decoderbufs",
"table.whitelist": "public.a,public.b",
"database.hostname": "app-db",
"name": "connector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"database.whitelist": "xxx",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.source.fields": "table"
}
然后我使用 KSQL CLI 与我的服务器交互并发出以下命令:
CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');
CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');
CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;
CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;
TLDR,我从 debezium 主题创建了 2 个流并将它们重新分区以使它们为 JOIN 做好准备。
然后,我将其中一个 (b_by_id) 变成 table,因为在这种情况下我不想使用窗口连接:
CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');
此时一切正常,我可以使用我的流和 tables 并加入并看到我的源数据库中的更改立即反映在我在 KSQL 中的流式查询中。
当我决定对我的数据执行一些聚合函数并将结果镜像到我的 Postgres 数据库(与源数据库相同)时,我的问题就出现了。为此,我创建了一个新的 KSQL table 作为 SELECT:
的结果
CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;
然后,我设置了一个 JDBC 接收器连接器,以使用以下配置将我的新 table 的 grouped_data 更新日志主题转储到我的数据库中:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"topics": "grouped_data",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-svc:8081",
"pk.mode": "record_value",
"pk.fields": "x, y, z",
"table.name.format" : "kafka_${topic}",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "max_date",
"transforms.TimestampConverter.target.type": "Timestamp"
}
不幸的是,我的接收器 DB 上没有任何错误,也没有任何数据。连接器已正确创建和配置,即使我强制流式查询处理新消息,也没有数据传输到我的接收器数据库,甚至没有创建目标 table。
我尝试使用不同的名称和配置、pk.mode 的不同值等多次创建连接器,但我无法让它工作。为我上面的 table "b" 创建一个连接器工作得很好,所有数据都会立即传输。
以下是我尝试镜像到 postgres 的 KSQL table 的更多详细信息:
describe extended grouped_data;
Name : GROUPED_DATA
Type : TABLE
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : GROUPED_DATA (partitions: 1, replication: 1)
Field | Type
------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
X | BIGINT
Y | BIGINT
Z | BIGINT
MAX_DATE | BIGINT
------------------------------------------------
谢谢!
您已将 Kafka Connect 配置为使用小写主题名称
"topics": "grouped_data",
但是根据您的 DESCRIBE
输出,table 正在写入的主题是大写的:
Kafka topic : GROUPED_DATA (partitions: 1, replication: 1)
如果您仔细检查 Kafka Connect worker 日志,您会发现:
Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE}
如果您给 Kafka Connect 一个不存在的主题,Kafka Connect 不会中止 - 因为这可能是您 想要 指定的主题,因为您随后将填充它。
所以,您可以修改您的 Kafka Connect worker 配置以使用大写主题名称,或者您可以重新定义您的 ksqlDB table 并包含 …WITH (KAFKA_TOPIC='grouped_data')
在 DDL 中。
我正在尝试使用 JDBC 接收器连接器在我的 Postgres 数据库中镜像 KSQL table,但不幸的是我无法使其工作。
我正在使用 Kafka 5.4.1,并且我有 2 个 debezium 1.0 主题使用来自我的 Postgres 数据库的 Avro 序列化。这是我的 Debezium 连接器的配置:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.dbname": "xxx",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "kafka-svc:9092",
"database.history.kafka.topic": "dbhistory.xxx",
"database.server.name": "xxx",
"database.port": "5432",
"plugin.name": "decoderbufs",
"table.whitelist": "public.a,public.b",
"database.hostname": "app-db",
"name": "connector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"database.whitelist": "xxx",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.source.fields": "table"
}
然后我使用 KSQL CLI 与我的服务器交互并发出以下命令:
CREATE STREAM a_dbz
WITH (KAFKA_TOPIC='xxx.public.a', VALUE_FORMAT='AVRO');
CREATE STREAM b_dbz
WITH (KAFKA_TOPIC='xxx.public.b', VALUE_FORMAT='AVRO');
CREATE STREAM a_by_b_id
WITH (KAFKA_TOPIC='a_by_b_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM a_dbz PARTITION BY b_id;
CREATE STREAM b_by_id
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', PARTITIONS=1)
AS SELECT * FROM b_dbz PARTITION BY id;
TLDR,我从 debezium 主题创建了 2 个流并将它们重新分区以使它们为 JOIN 做好准备。 然后,我将其中一个 (b_by_id) 变成 table,因为在这种情况下我不想使用窗口连接:
CREATE TABLE b
WITH (KAFKA_TOPIC='b_by_id', VALUE_FORMAT='avro', KEY='id');
此时一切正常,我可以使用我的流和 tables 并加入并看到我的源数据库中的更改立即反映在我在 KSQL 中的流式查询中。 当我决定对我的数据执行一些聚合函数并将结果镜像到我的 Postgres 数据库(与源数据库相同)时,我的问题就出现了。为此,我创建了一个新的 KSQL table 作为 SELECT:
的结果CREATE TABLE grouped_data AS
SELECT x, y, z, MAX(date) AS max_date
FROM a_by_b_id
INNER JOIN b ON a_by_b_id.b_id = b.id
GROUP BY x, y, z
EMIT CHANGES;
然后,我设置了一个 JDBC 接收器连接器,以使用以下配置将我的新 table 的 grouped_data 更新日志主题转储到我的数据库中:
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://app-db:5432/xxx",
"insert.mode": "upsert",
"auto.create": true,
"auto.evolve": true,
"topics": "grouped_data",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry-svc:8081",
"pk.mode": "record_value",
"pk.fields": "x, y, z",
"table.name.format" : "kafka_${topic}",
"transforms": "TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.field": "max_date",
"transforms.TimestampConverter.target.type": "Timestamp"
}
不幸的是,我的接收器 DB 上没有任何错误,也没有任何数据。连接器已正确创建和配置,即使我强制流式查询处理新消息,也没有数据传输到我的接收器数据库,甚至没有创建目标 table。 我尝试使用不同的名称和配置、pk.mode 的不同值等多次创建连接器,但我无法让它工作。为我上面的 table "b" 创建一个连接器工作得很好,所有数据都会立即传输。
以下是我尝试镜像到 postgres 的 KSQL table 的更多详细信息:
describe extended grouped_data;
Name : GROUPED_DATA
Type : TABLE
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : GROUPED_DATA (partitions: 1, replication: 1)
Field | Type
------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
X | BIGINT
Y | BIGINT
Z | BIGINT
MAX_DATE | BIGINT
------------------------------------------------
谢谢!
您已将 Kafka Connect 配置为使用小写主题名称
"topics": "grouped_data",
但是根据您的 DESCRIBE
输出,table 正在写入的主题是大写的:
Kafka topic : GROUPED_DATA (partitions: 1, replication: 1)
如果您仔细检查 Kafka Connect worker 日志,您会发现:
Error while fetching metadata with correlation id 2 : {grouped_data=LEADER_NOT_AVAILABLE}
如果您给 Kafka Connect 一个不存在的主题,Kafka Connect 不会中止 - 因为这可能是您 想要 指定的主题,因为您随后将填充它。
所以,您可以修改您的 Kafka Connect worker 配置以使用大写主题名称,或者您可以重新定义您的 ksqlDB table 并包含 …WITH (KAFKA_TOPIC='grouped_data')
在 DDL 中。