如何直接从kafka主题查询?

How to query directly from a kafka topic?

我调查了 Interactive queries and KSQL 但我似乎无法弄清楚是否可以根据键查询特定记录。

假设我在一个主题中有一条记录,如下所示:

{
  key: 12314,
  value: 
    {
       id: "1",
       name: "bob"
    }
}

是否可以在主题中搜索关键字12314?还有KSQL和交互式查询会消耗整个topic做查询吗?

是的,您可以使用交互式查询来完成。

您可以创建一个 kafka 流来读取输入主题并生成状态存储(在 memory/rocksdb 中并与 kafka 同步)。

此状态存储可通过键查询 (ReadOnlyKeyValueStore)。

您在官方文档中有多个示例: https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html

假设您的值是有效的 JSON(即字段名称也被引用)那么您可以使用 KSQL/ksqlDB 轻松地做到这一点:

检查 ksqlDB 中的 Kafka 主题:

ksql> PRINT test3;
Format:JSON
1/9/20 12:11:35 PM UTC , 12314 , {"id": "1", "name": "bob"    } 

声明流:

ksql> CREATE STREAM FOO (ID VARCHAR, NAME VARCHAR) 
        WITH (KAFKA_TOPIC='test3',VALUE_FORMAT='JSON');

在数据到达时过滤流

ksql> SELECT ROWKEY, ID, NAME FROM FOO WHERE ROWKEY='12314' EMIT CHANGES;
+----------------------------+----------------------------+----------------------------+
|ROWKEY                      |ID                          |NAME                        |
+----------------------------+----------------------------+----------------------------+
|12314                       |1                           |bob                         |

大家总是忘记补充一点,如果底层数据集很小并且可以具体化,则可以使用交互式查询。

例如,您无法在庞大的主题中通过关键字高效地查找消息。至少我找不到这样的方法