通过更改跟踪从 Postgres 到 Kafka

From Postgres to Kafka with changes tracking

这个问题在 之后。

主要任务是在KSQL端做join。下面的例子将说明它。事件消息到达 Kafka 主题。该消息的结构:

[
    {
        "name": "from_ts", 
        "type": "bigint"
    },
    {
        "name": "to_ts", 
        "type": "bigint"
    },
    {
        "name": "rulenode_id",
        "type": "int"
    }
]

还有一个 Postgres table rulenode:

id | name | description 

来自两个来源的数据需要按字段 rulenode_id = rulenode.id 连接,以便获得包含字段 from_ts, to_ts, rulenode_id, rulenode_name, rulenode_description.

的单条记录

我想通过 KSQL 来做这件事,而不是像现在这样使用后端。

现在来自 Postgres table 的数据通过 JdbcSourceConnector 传输到 Kafka。但是有一个小问题——您可能猜想 Postgres table 中的数据可能会被更改。当然,这些更改也应该在 KSQL 流或 table 中。

下面有人问我为什么是 KTable 而不是 Kstream。那么,请访问 this page 并查看第一个 GIF。当新数据到达时,table 的记录正在更新。我认为这样的行为是我需要的(我有 Postgres table rulenode 的主键 id 而不是名字 Alice,Bob)。这就是我选择KTable的原因。

JdbcSourceConnect 的批量模式复制所有 table。如您所知,所有行都到达 Kafka table 到以前的 Postgres table 快照。


按照建议,我创建了一个带有配置的连接器:

{
  "name": "from-pg",
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "errors.log.enable": "true",
  "connection.url": "connection.url",
  "connection.user": "postgres",
  "connection.password": "*************",
  "table.whitelist": "rulenode",
  "mode": "bulk",
  "poll.interval.ms": "5000",
  "topic.prefix": "pg."
}

然后创建了一个流:

create stream rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro', key='id');

现在正在尝试创建 table:

create table rulenodes_unique 
    as select * from rulenodes;

但这并没有奏效,出现错误:

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

我读到 tables 用于存储聚合信息。例如,使用 COUNT 函数存储聚合:

create table rulenodes_unique 
    as select id, count(*) from rulenodes order by id;

你能说说如何处理那个错误吗?

尚不清楚哪个语句引发错误,但如果在 table 定义

上会产生误导

您可以直接从主题创建 table。无需通过流

https://docs.confluent.io/current/ksql/docs/developer-guide/create-a-table.html

如果你也想使用流,正如文档所说

Use the CREATE TABLE <b>AS SELECT</b> statement to create a table with query results from an existing table or stream.

您可能希望在语句中使用区分大小写的值

CREATE STREAM rulenodes WITH (
    KAFKA_TOPIC ='pg.rules_rulenode', 
    VALUE_FORMAT='AVRO', 
    KEY='id'
);


CREATE TABLE rulenodes_unique AS
    SELECT id, COUNT(*) FROM rulenodes 
    ORDER BY id;

您可以使用 ksqlDB 在 Kafka 主题之上创建 STREAMTABLE - 这与您希望如何对数据建模有关。从您的问题可以清楚地看出,您需要将其建模为 table(因为您想加入 最新版本的密钥 )。所以你需要这样做:

create table rulenodes 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

现在您 还需要做一件事,即确保正确键入主题中的数据。您不能指定 key='id',它会自动发生 - key 参数只是一个 'hint'。您必须确保 Kafka 主题中的消息在 key 中具有 id 字段。有关详细信息,请参阅 ref doc

您可以使用 Single Message Transform in Kafka Connect 来做到这一点:

"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"id"

或者您可以在 ksqlDB 中执行此操作并更改密钥 - 因为我们要处理每个 事件 我们首先将其建模为 stream (!) 并在重新键入的主题上声明 table:

create stream rulenodes_source 
    with (kafka_topic='pg.rules_rulenode', value_format='avro');

CREATE STREAM RULENODES_REKEY AS SELECT * FROM rulenodes_source PARITION BY id;

CREATE TABLE rulenodes WITH (kafka_topic='RULENODES_REKEY', value_format='avro');

我会选择单一消息转换路线,因为它总体上更整洁、更简单。