如何通过 KSQL 在 JSON 字符串中 select 值?
How to select value in a JSON string by KSQL?
我在 Postgres table 中有一个名为 metadata
的 JSONB 字段。当我使用 Debezium PostgreSQL 连接器生成 CDC 时,它将 metadata
作为字符串写入 Kafka。
我在Kafka主题中得到的这个CDC my_db_server.public.product
:
{
"before": null,
"after": {
"id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
"metadata": "{\"operation\": \"CREATE\"}". <-- Here is a string
},
"source": {
"version": "1.8.0.Final",
"connector": "postgresql",
"name": "my_db_server",
"ts_ms": 1648074184197,
"snapshot": "false",
"db": "my_db",
"sequence": "[\"25825800\",\"25833896\"]",
"schema": "public",
"table": "product",
"txId": 673,
"lsn": 25833896,
"xmin": null
},
"op": "c",
"ts_ms": 1648074184256,
"transaction": null
}
将 Postgres JSON/JSONB 字段 CDC 保存为 Kafka 中的字符串是基于 https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types
的工作方式
我通过
通过 KSQL 创建了一个 Kafka 流
CREATE STREAM my_stream (after STRUCT<id STRING,
metadata STRING>)
WITH (KAFKA_TOPIC='my_db_server.public.product',
VALUE_FORMAT='JSON');
接下来我想在metadata
领域做一些基于operation
的工作。我可以 select after->metadata
,但不能 after->metadata->operation
因为 metadata
是一个字符串:
SELECT after->id,
after->metadata->operation, <-- Give error: Expected STRUCT type, got: STRING
COUNT(after->id),
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after <-- I hope to change to after->metadata->operation next
EMIT CHANGES;
这会给我错误:
Expected STRUCT type, got: STRING
在这种情况下 select after->metadata->operation
的正确方法是什么?或者我可以在创建流时做一些工作吗?谢谢!
您可以使用 extractjsonfield 函数访问 operation
:
SELECT after->id,
EXTRACTJSONFIELD(after->metadata, '$.operation'),
COUNT(after->id),
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after
EMIT CHANGES;
我在 Postgres table 中有一个名为 metadata
的 JSONB 字段。当我使用 Debezium PostgreSQL 连接器生成 CDC 时,它将 metadata
作为字符串写入 Kafka。
我在Kafka主题中得到的这个CDC my_db_server.public.product
:
{
"before": null,
"after": {
"id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
"metadata": "{\"operation\": \"CREATE\"}". <-- Here is a string
},
"source": {
"version": "1.8.0.Final",
"connector": "postgresql",
"name": "my_db_server",
"ts_ms": 1648074184197,
"snapshot": "false",
"db": "my_db",
"sequence": "[\"25825800\",\"25833896\"]",
"schema": "public",
"table": "product",
"txId": 673,
"lsn": 25833896,
"xmin": null
},
"op": "c",
"ts_ms": 1648074184256,
"transaction": null
}
将 Postgres JSON/JSONB 字段 CDC 保存为 Kafka 中的字符串是基于 https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types
的工作方式我通过
通过 KSQL 创建了一个 Kafka 流CREATE STREAM my_stream (after STRUCT<id STRING,
metadata STRING>)
WITH (KAFKA_TOPIC='my_db_server.public.product',
VALUE_FORMAT='JSON');
接下来我想在metadata
领域做一些基于operation
的工作。我可以 select after->metadata
,但不能 after->metadata->operation
因为 metadata
是一个字符串:
SELECT after->id,
after->metadata->operation, <-- Give error: Expected STRUCT type, got: STRING
COUNT(after->id),
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after <-- I hope to change to after->metadata->operation next
EMIT CHANGES;
这会给我错误:
Expected STRUCT type, got: STRING
在这种情况下 select after->metadata->operation
的正确方法是什么?或者我可以在创建流时做一些工作吗?谢谢!
您可以使用 extractjsonfield 函数访问 operation
:
SELECT after->id,
EXTRACTJSONFIELD(after->metadata, '$.operation'),
COUNT(after->id),
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after
EMIT CHANGES;