ksqlDB:使用嵌套结构连接 tables/streams(未展平)
ksqlDB: Joining tables/streams with nested structure(not flattened)
我得到了两个不同的主题,它们各自采用 avro 格式的架构,X 和 Y。这两个主题都有很多字段。
我想在它们之间创建一个 table-stream 连接关系,并按以下格式将其输出到另一个主题:
{
id, // the id used to join them
x_name : X,
y_name: Y
}
换句话说,我想加入这两个嵌套的每个源。
我能够以正常方式加入他们,但是所有领域都被夷为平地。
这可以用 KsqlDB 实现吗?我试图找到一个没有成功的好方法。
编辑:
添加更多信息和示例。
假设我有两个具有此类数据的主题。
product_supply
{
"product_id": 1,
"name": "name",
"stock": 11
"price": "141",
"storage_ids": [1, 2, 3]
}
product_information
{
"product_id": 1,
"description": "151",
"manufacturer": "ABC"
"Vendor_id": "5"
}
我想使用 KsqlDB 以非扁平化方式连接这些 table 并发布到一个主题,如下所示:
{
"product_id": 1,
"product_information": {
"product_id": 1,
"description": "151",
"manufacturer": "ABC"
"Vendor_id": "5"
}
"product_supply": {
"product_id": 1,
"name": "name",
"stock": 11
"price": "141",
"storage_ids": [1, 2, 3]
}
}
我已经为每个主题添加了架构,如果可能的话,我希望使用这些架构而不必在 ksql 中显式定义每个字段。
working with structured data in ksqlDB 上有一个很好的指南。基于此,我能够让它工作:
创建示例数据
CREATE STREAM PRODUCT_SUPPLY (PRODUCT_ID INT, NAME VARCHAR, STOCK INT, PRICE INT, STORAGE_IDS ARRAY<INT>) WITH (KAFKA_TOPIC='PRODUCT_SUPPLY', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
CREATE TABLE PRODUCT_INFORMATION (PRODUCT_ID INT PRIMARY KEY, DESCRIPTION VARCHAR, MANUFACTURER VARCHAR, VENDOR_ID INT) WITH (KAFKA_TOPIC='PRODUCT_INFO', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO');
INSERT INTO PRODUCT_SUPPLY VALUES(1,'NAME',11,141,ARRAY[1,2,3]);
INSERT INTO PRODUCT_INFORMATION values (1,'151','abc',5);
查询数据
SET 'auto.offset.reset' = 'earliest';
SELECT PS.PRODUCT_ID AS PRODUCT_ID,
STRUCT(NAME := PS.NAME,
STOCK := PS.STOCK,
PRICE := PS.PRICE,
STORAGE_IDS := PS.STORAGE_IDS) AS PRODUCT_SUPPLY,
STRUCT(DESCRIPTION := PI.DESCRIPTION,
MANUFACTURER := PI.MANUFACTURER,
VENDOR_ID := PI.VENDOR_ID) AS PRODUCT_INFORMATION
FROM PRODUCT_SUPPLY PS
LEFT JOIN PRODUCT_INFORMATION PI
ON PS.PRODUCT_ID=PI.PRODUCT_ID
EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+
|PRODUCT_ID |PRODUCT_SUPPLY |PRODUCT_INFORMATION |
+-------------------------+-------------------------+-------------------------+
|1 |{NAME=NAME, STOCK=11, PRI|{DESCRIPTION=151, MANUFAC|
| |CE=141, STORAGE_IDS=[1, 2|TURER=abc, VENDOR_ID=5} |
| |, 3]} | |
Limit Reached
Query terminated
我得到了两个不同的主题,它们各自采用 avro 格式的架构,X 和 Y。这两个主题都有很多字段。 我想在它们之间创建一个 table-stream 连接关系,并按以下格式将其输出到另一个主题:
{
id, // the id used to join them
x_name : X,
y_name: Y
}
换句话说,我想加入这两个嵌套的每个源。 我能够以正常方式加入他们,但是所有领域都被夷为平地。 这可以用 KsqlDB 实现吗?我试图找到一个没有成功的好方法。
编辑:
添加更多信息和示例。 假设我有两个具有此类数据的主题。
product_supply
{
"product_id": 1,
"name": "name",
"stock": 11
"price": "141",
"storage_ids": [1, 2, 3]
}
product_information
{
"product_id": 1,
"description": "151",
"manufacturer": "ABC"
"Vendor_id": "5"
}
我想使用 KsqlDB 以非扁平化方式连接这些 table 并发布到一个主题,如下所示:
{
"product_id": 1,
"product_information": {
"product_id": 1,
"description": "151",
"manufacturer": "ABC"
"Vendor_id": "5"
}
"product_supply": {
"product_id": 1,
"name": "name",
"stock": 11
"price": "141",
"storage_ids": [1, 2, 3]
}
}
我已经为每个主题添加了架构,如果可能的话,我希望使用这些架构而不必在 ksql 中显式定义每个字段。
working with structured data in ksqlDB 上有一个很好的指南。基于此,我能够让它工作:
创建示例数据
CREATE STREAM PRODUCT_SUPPLY (PRODUCT_ID INT, NAME VARCHAR, STOCK INT, PRICE INT, STORAGE_IDS ARRAY<INT>) WITH (KAFKA_TOPIC='PRODUCT_SUPPLY', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO'); CREATE TABLE PRODUCT_INFORMATION (PRODUCT_ID INT PRIMARY KEY, DESCRIPTION VARCHAR, MANUFACTURER VARCHAR, VENDOR_ID INT) WITH (KAFKA_TOPIC='PRODUCT_INFO', REPLICAS=1,PARTITIONS=6,VALUE_FORMAT='AVRO'); INSERT INTO PRODUCT_SUPPLY VALUES(1,'NAME',11,141,ARRAY[1,2,3]); INSERT INTO PRODUCT_INFORMATION values (1,'151','abc',5);
查询数据
SET 'auto.offset.reset' = 'earliest'; SELECT PS.PRODUCT_ID AS PRODUCT_ID, STRUCT(NAME := PS.NAME, STOCK := PS.STOCK, PRICE := PS.PRICE, STORAGE_IDS := PS.STORAGE_IDS) AS PRODUCT_SUPPLY, STRUCT(DESCRIPTION := PI.DESCRIPTION, MANUFACTURER := PI.MANUFACTURER, VENDOR_ID := PI.VENDOR_ID) AS PRODUCT_INFORMATION FROM PRODUCT_SUPPLY PS LEFT JOIN PRODUCT_INFORMATION PI ON PS.PRODUCT_ID=PI.PRODUCT_ID EMIT CHANGES LIMIT 1;
+-------------------------+-------------------------+-------------------------+ |PRODUCT_ID |PRODUCT_SUPPLY |PRODUCT_INFORMATION | +-------------------------+-------------------------+-------------------------+ |1 |{NAME=NAME, STOCK=11, PRI|{DESCRIPTION=151, MANUFAC| | |CE=141, STORAGE_IDS=[1, 2|TURER=abc, VENDOR_ID=5} | | |, 3]} | | Limit Reached Query terminated