如何通过 KSQL 在 JSON 字符串中 select 值?

How to select value in a JSON string by KSQL?

我在 Postgres table 中有一个名为 metadata 的 JSONB 字段。当我使用 Debezium PostgreSQL 连接器生成 CDC 时,它将 metadata 作为字符串写入 Kafka。

我在Kafka主题中得到的这个CDC my_db_server.public.product:

{
  "before": null,
  "after": {
    "id": "322f13b2-9a0e-407e-94c1-633c7b2a6ca1",
    "metadata": "{\"operation\": \"CREATE\"}".    <-- Here is a string
  },
  "source": {
    "version": "1.8.0.Final",
    "connector": "postgresql",
    "name": "my_db_server",
    "ts_ms": 1648074184197,
    "snapshot": "false",
    "db": "my_db",
    "sequence": "[\"25825800\",\"25833896\"]",
    "schema": "public",
    "table": "product",
    "txId": 673,
    "lsn": 25833896,
    "xmin": null
  },
  "op": "c",
  "ts_ms": 1648074184256,
  "transaction": null
}

将 Postgres JSON/JSONB 字段 CDC 保存为 Kafka 中的字符串是基于 https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-basic-types

的工作方式

我通过

通过 KSQL 创建了一个 Kafka 流
CREATE STREAM my_stream (after STRUCT<id STRING,
                                      metadata STRING>)
              WITH (KAFKA_TOPIC='my_db_server.public.product',
                    VALUE_FORMAT='JSON');

接下来我想在metadata领域做一些基于operation的工作。我可以 select after->metadata,但不能 after->metadata->operation 因为 metadata 是一个字符串:

SELECT after->id,
       after->metadata->operation,    <-- Give error: Expected STRUCT type, got: STRING
       COUNT(after->id),
       WINDOWSTART AS window_start,
       WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after    <-- I hope to change to after->metadata->operation next
EMIT CHANGES;

这会给我错误:

Expected STRUCT type, got: STRING

在这种情况下 select after->metadata->operation 的正确方法是什么?或者我可以在创建流时做一些工作吗?谢谢!

您可以使用 extractjsonfield 函数访问 operation

SELECT after->id,
       EXTRACTJSONFIELD(after->metadata, '$.operation'),
       COUNT(after->id),
       WINDOWSTART AS window_start,
       WINDOWEND AS window_end
FROM my_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after
EMIT CHANGES;