如何用KSQLDB操作Kafka关键文件?

How to manipulate Kafka key documents with KSQLDB?

我有问题。我找不到通过过滤 kafka 文档的键来创建流的方法。

我想过滤和操作 kafka 密钥的 json 以检索以下示例的 payload,它对应于我的 couchbase id:

ksql> print 'cb_bench_products-get_purge' limit 1;

Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING

rowtime: 2022/03/04 10:49:43.643 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {[...]}}

您没有指定消息的 value 部分,因此我模拟了一些数据并假设它也是 JSON。首先我把它加载到一个主题中再次测试:

$ kcat -b localhost:9092 -t test -P -K!
{"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}!{"col1":"foo","col2":"bar","col3":42}
^D

使用ksqlDB(0.23.1-rc9)查看题目中的数据:

ksql> print 'test' from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 14:14:01.539 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {"col1":"foo","col2":"bar","col3":42}, partition: 0

在主题上声明一个流,使用 STRUCT 表示嵌套的 JSON。我假设您对 schema 不感兴趣,所以我省略了它。

CREATE STREAM my_test (
  my_key_col STRUCT < payload VARCHAR > KEY,
  col1 VARCHAR,
  col2 VARCHAR,
  col3 INT
) WITH (KAFKA_TOPIC = 'test', FORMAT = 'JSON');

在键上使用谓词查询流:

SET 'auto.offset.reset' = 'earliest';

SELECT my_key_col->payload, col1, col2, col3
  FROM my_test
 WHERE my_key_col->payload LIKE 'history%'
EMIT CHANGES;

+--------------------------------------------+-------+------+-------+
|PAYLOAD                                     |COL1   |COL2  |COL3   |
+--------------------------------------------+-------+------+-------+
|history::05000228023411_RO_RO11219082::80   |foo    |bar   |42     |