JDBC Kafka Connect 使用多个表并生成嵌套 JSON 和数组
JDBC Kafka Connect using multiple tables with resulting nested JSON and arrays
Kafka 连接和 JDBC 源连接器。
我正在尝试使用表中的数组获取嵌套 JSON:
/* Create tables, in this case DB2 */
CREATE TABLE contacts(
contact_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(contact_id)
);
CREATE TABLE phones(
phone_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
phone_no VARCHAR(20) NOT NULL,
phone_type VARCHAR(10) NOT NULL,
contact_id INT NOT NULL,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(phone_id),
FOREIGN KEY (contact_id)
REFERENCES contacts(contact_id)
ON UPDATE NO ACTION
ON DELETE CASCADE
);
/* Insert some data */
INSERT INTO contacts(first_name, last_name)
VALUES
('John','Doe');
INSERT INTO phones(phone_no, phone_type, contact_id)
VALUES
('Johns phone #1','HOME',1),
('Johns phone #2','MOBILE',1),
('Johns phone #3','WORK',1);
JSON 我想通过 Kafka Connect 讨论 Kafka 主题是这样的(可以进行微小的调整):
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phones":
[
{
"phone_id": 1,
"phone_no": "Johns phone #1",
"phone_type": "HOME",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.101"
},
{
"phone_id": 2,
"phone_no": "Johns phone #2",
"phone_type": "MOBILE",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.210"
},
{
"phone_id": 3,
"phone_no": "Johns phone #3",
"phone_type": "WORK",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.673"
}
]
}
如何使用 Kafka Connect(即 Kafka Connect 配置)做到这一点?
JDBC 连接器不支持数组类型。
您可以编写一个 VIEW
在您的表之间定义一个 JOIN
,然后在连接器中定义 query
,这将创建独特的事件,例如
记录 1
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phone_id": 1,
"phone_no": "Johns phone #1",
"phone_type": "HOME",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.101"
}
记录 2
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phone_id": 2,
"phone_no": "Johns phone #2",
"phone_type": "MOBILE",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.210"
}
例如,您需要使用流处理库 group-by contact_id
。或者您可以在各个表上使用 Debezium / JDBC Source,然后在 Kafka Streams / KSQL 中执行 JOIN 并在 there.
中创建数组
Kafka 连接和 JDBC 源连接器。 我正在尝试使用表中的数组获取嵌套 JSON:
/* Create tables, in this case DB2 */
CREATE TABLE contacts(
contact_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(contact_id)
);
CREATE TABLE phones(
phone_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
phone_no VARCHAR(20) NOT NULL,
phone_type VARCHAR(10) NOT NULL,
contact_id INT NOT NULL,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(phone_id),
FOREIGN KEY (contact_id)
REFERENCES contacts(contact_id)
ON UPDATE NO ACTION
ON DELETE CASCADE
);
/* Insert some data */
INSERT INTO contacts(first_name, last_name)
VALUES
('John','Doe');
INSERT INTO phones(phone_no, phone_type, contact_id)
VALUES
('Johns phone #1','HOME',1),
('Johns phone #2','MOBILE',1),
('Johns phone #3','WORK',1);
JSON 我想通过 Kafka Connect 讨论 Kafka 主题是这样的(可以进行微小的调整):
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phones":
[
{
"phone_id": 1,
"phone_no": "Johns phone #1",
"phone_type": "HOME",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.101"
},
{
"phone_id": 2,
"phone_no": "Johns phone #2",
"phone_type": "MOBILE",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.210"
},
{
"phone_id": 3,
"phone_no": "Johns phone #3",
"phone_type": "WORK",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.673"
}
]
}
如何使用 Kafka Connect(即 Kafka Connect 配置)做到这一点?
JDBC 连接器不支持数组类型。
您可以编写一个 VIEW
在您的表之间定义一个 JOIN
,然后在连接器中定义 query
,这将创建独特的事件,例如
记录 1
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phone_id": 1,
"phone_no": "Johns phone #1",
"phone_type": "HOME",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.101"
}
记录 2
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phone_id": 2,
"phone_no": "Johns phone #2",
"phone_type": "MOBILE",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.210"
}
例如,您需要使用流处理库 group-by contact_id
。或者您可以在各个表上使用 Debezium / JDBC Source,然后在 Kafka Streams / KSQL 中执行 JOIN 并在 there.