JDBC Kafka Connect 使用多个表并生成嵌套 JSON 和数组

JDBC Kafka Connect using multiple tables with resulting nested JSON and arrays

Kafka 连接和 JDBC 源连接器。 我正在尝试使用表中的数组获取嵌套 JSON:

/* Create tables, in this case DB2 */
CREATE TABLE contacts(
    contact_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
    first_name VARCHAR(100) NOT NULL,
    last_name VARCHAR(100) NOT NULL,
    modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY(contact_id)
);

CREATE TABLE phones(
    phone_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
    phone_no VARCHAR(20) NOT NULL,
    phone_type VARCHAR(10) NOT NULL,
    contact_id INT NOT NULL,
    modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY(phone_id),
    FOREIGN KEY (contact_id)
        REFERENCES contacts(contact_id)
        ON UPDATE NO ACTION
        ON DELETE CASCADE
);

/* Insert some data */
INSERT INTO contacts(first_name, last_name) 
    VALUES
        ('John','Doe');

INSERT INTO phones(phone_no, phone_type, contact_id)
    VALUES
        ('Johns phone #1','HOME',1),
        ('Johns phone #2','MOBILE',1),
        ('Johns phone #3','WORK',1);

JSON 我想通过 Kafka Connect 讨论 Kafka 主题是这样的(可以进行微小的调整):

{
    "contact_id": 1,
    "first_name": "John",
    "last_name": "Doe",
    "modified_at": "2022-03-16T13:33:04.276",
    "phones":
    [
        {
            "phone_id": 1,
            "phone_no": "Johns phone #1",
            "phone_type": "HOME",
            "contact_id": 1,
            "modified_at": "2022-03-16T13:33:05.101"
        },
        {
            "phone_id": 2,
            "phone_no": "Johns phone #2",
            "phone_type": "MOBILE",
            "contact_id": 1,
            "modified_at": "2022-03-16T13:33:05.210"
        },
        {
            "phone_id": 3,
            "phone_no": "Johns phone #3",
            "phone_type": "WORK",
            "contact_id": 1,
            "modified_at": "2022-03-16T13:33:05.673"
        }
    ]
}

如何使用 Kafka Connect(即 Kafka Connect 配置)做到这一点?

JDBC 连接器不支持数组类型。

您可以编写一个 VIEW 在您的表之间定义一个 JOIN,然后在连接器中定义 query,这将创建独特的事件,例如

记录 1

{
    "contact_id": 1,
    "first_name": "John",
    "last_name": "Doe",
    "modified_at": "2022-03-16T13:33:04.276",
    "phone_id": 1,
    "phone_no": "Johns phone #1",
    "phone_type": "HOME",
    "contact_id": 1,
    "modified_at": "2022-03-16T13:33:05.101"
}

记录 2

{
    "contact_id": 1,
    "first_name": "John",
    "last_name": "Doe",
    "modified_at": "2022-03-16T13:33:04.276",
    "phone_id": 2,
    "phone_no": "Johns phone #2",
    "phone_type": "MOBILE",
    "contact_id": 1,
    "modified_at": "2022-03-16T13:33:05.210"
}

例如,您需要使用流处理库 group-by contact_id。或者您可以在各个表上使用 Debezium / JDBC Source,然后在 Kafka Streams / KSQL 中执行 JOIN 并在 there.

中创建数组