我们可以 select 来自融合的 kafka 主题的特定记录行吗?

Can we select a specific row of records from a confluent kafka topic?

在我本地的 Confluent Platform 中,我有 1 个主题调用“FOO_02”,我手动向其中插入了一些记录,因此,我可以根据以下命令从头开始打印它:

print 'FOO_02' from beginning;

我可以做类似的事情吗,我只想提取 COL1 = 1 的记录?像我们可以执行带有 where 条件的 select 语句从普通数据库中提取数据,例如 db2.

我尝试了以下命令,但我相信它只会获取新数据,因为我得到的是此命令的空记录:

ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;

我假设你已经完成了

CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');

否则你的 SELECT 就会失败。

因此,由于 PRINT 成功显示 topic 中有数据,您可以使用所需的谓词查询流。您唯一需要做的就是告诉 ksqlDB 处理 all 主题中的数据而不仅仅是新记录(这就是 from beginningPRINT 陈述)。为此,运行:

SET 'auto.offset.reset' = 'earliest';

然后然后运行SELECT.


编辑

Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?

您描述的是 TABLE:给定键的最新值。

CREATE TABLE FOO AS
  SELECT COL1, 
         LATEST_BY_OFFSET(COL2) AS COL2
    FROM FOO_02
   WHERE COL1=0
   GROUP BY COL1;

结果 table 将有一个 COL1 条目,随着收到新消息,最新值为 COL2

Any way to bring the old data in the Stream into the table as well?

为了处理现有数据,也要在 运行 执行 CREATE 语句

之前将偏移量设置回最早
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE FOO AS
[…]