如何用KSQLDB操作Kafka关键文件?
How to manipulate Kafka key documents with KSQLDB?
我有问题。我找不到通过过滤 kafka 文档的键来创建流的方法。
我想过滤和操作 kafka 密钥的 json 以检索以下示例的 payload
,它对应于我的 couchbase id:
ksql> print 'cb_bench_products-get_purge' limit 1;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 10:49:43.643 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {[...]}}
您没有指定消息的 value
部分,因此我模拟了一些数据并假设它也是 JSON。首先我把它加载到一个主题中再次测试:
$ kcat -b localhost:9092 -t test -P -K!
{"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}!{"col1":"foo","col2":"bar","col3":42}
^D
使用ksqlDB(0.23.1-rc9)查看题目中的数据:
ksql> print 'test' from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 14:14:01.539 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {"col1":"foo","col2":"bar","col3":42}, partition: 0
在主题上声明一个流,使用 STRUCT
表示嵌套的 JSON。我假设您对 schema
不感兴趣,所以我省略了它。
CREATE STREAM my_test (
my_key_col STRUCT < payload VARCHAR > KEY,
col1 VARCHAR,
col2 VARCHAR,
col3 INT
) WITH (KAFKA_TOPIC = 'test', FORMAT = 'JSON');
在键上使用谓词查询流:
SET 'auto.offset.reset' = 'earliest';
SELECT my_key_col->payload, col1, col2, col3
FROM my_test
WHERE my_key_col->payload LIKE 'history%'
EMIT CHANGES;
+--------------------------------------------+-------+------+-------+
|PAYLOAD |COL1 |COL2 |COL3 |
+--------------------------------------------+-------+------+-------+
|history::05000228023411_RO_RO11219082::80 |foo |bar |42 |
我有问题。我找不到通过过滤 kafka 文档的键来创建流的方法。
我想过滤和操作 kafka 密钥的 json 以检索以下示例的 payload
,它对应于我的 couchbase id:
ksql> print 'cb_bench_products-get_purge' limit 1;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 10:49:43.643 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {[...]}}
您没有指定消息的 value
部分,因此我模拟了一些数据并假设它也是 JSON。首先我把它加载到一个主题中再次测试:
$ kcat -b localhost:9092 -t test -P -K!
{"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}!{"col1":"foo","col2":"bar","col3":42}
^D
使用ksqlDB(0.23.1-rc9)查看题目中的数据:
ksql> print 'test' from beginning;
Key format: JSON or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/03/04 14:14:01.539 Z, key: {"schema":{"type":"string","optional":false},"payload":"history::05000228023411_RO_RO11219082::80"}, value: {"col1":"foo","col2":"bar","col3":42}, partition: 0
在主题上声明一个流,使用 STRUCT
表示嵌套的 JSON。我假设您对 schema
不感兴趣,所以我省略了它。
CREATE STREAM my_test (
my_key_col STRUCT < payload VARCHAR > KEY,
col1 VARCHAR,
col2 VARCHAR,
col3 INT
) WITH (KAFKA_TOPIC = 'test', FORMAT = 'JSON');
在键上使用谓词查询流:
SET 'auto.offset.reset' = 'earliest';
SELECT my_key_col->payload, col1, col2, col3
FROM my_test
WHERE my_key_col->payload LIKE 'history%'
EMIT CHANGES;
+--------------------------------------------+-------+------+-------+
|PAYLOAD |COL1 |COL2 |COL3 |
+--------------------------------------------+-------+------+-------+
|history::05000228023411_RO_RO11219082::80 |foo |bar |42 |