我们可以 select 来自融合的 kafka 主题的特定记录行吗?
Can we select a specific row of records from a confluent kafka topic?
在我本地的 Confluent Platform 中,我有 1 个主题调用“FOO_02”,我手动向其中插入了一些记录,因此,我可以根据以下命令从头开始打印它:
print 'FOO_02' from beginning;
我可以做类似的事情吗,我只想提取 COL1 = 1 的记录?像我们可以执行带有 where 条件的 select 语句从普通数据库中提取数据,例如 db2
.
我尝试了以下命令,但我相信它只会获取新数据,因为我得到的是此命令的空记录:
ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;
我假设你已经完成了
CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');
否则你的 SELECT
就会失败。
因此,由于 PRINT
成功显示 topic 中有数据,您可以使用所需的谓词查询流。您唯一需要做的就是告诉 ksqlDB 处理 all 主题中的数据而不仅仅是新记录(这就是 from beginning
在 PRINT
陈述)。为此,运行:
SET 'auto.offset.reset' = 'earliest';
然后然后运行SELECT
.
编辑
Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?
您描述的是 TABLE
:给定键的最新值。
CREATE TABLE FOO AS
SELECT COL1,
LATEST_BY_OFFSET(COL2) AS COL2
FROM FOO_02
WHERE COL1=0
GROUP BY COL1;
结果 table 将有一个 COL1
条目,随着收到新消息,最新值为 COL2
。
Any way to bring the old data in the Stream into the table as well?
为了处理现有数据,也要在 运行 执行 CREATE
语句
之前将偏移量设置回最早
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE FOO AS
[…]
在我本地的 Confluent Platform 中,我有 1 个主题调用“FOO_02”,我手动向其中插入了一些记录,因此,我可以根据以下命令从头开始打印它:
print 'FOO_02' from beginning;
我可以做类似的事情吗,我只想提取 COL1 = 1 的记录?像我们可以执行带有 where 条件的 select 语句从普通数据库中提取数据,例如 db2
.
我尝试了以下命令,但我相信它只会获取新数据,因为我得到的是此命令的空记录:
ksql> select * from FOO_02 WHERE COL1=1 EMIT CHANGES;
我假设你已经完成了
CREATE STREAM FOO_02 WITH (KAFKA_TOPIC='FOO_02', FORMAT='AVRO');
否则你的 SELECT
就会失败。
因此,由于 PRINT
成功显示 topic 中有数据,您可以使用所需的谓词查询流。您唯一需要做的就是告诉 ksqlDB 处理 all 主题中的数据而不仅仅是新记录(这就是 from beginning
在 PRINT
陈述)。为此,运行:
SET 'auto.offset.reset' = 'earliest';
然后然后运行SELECT
.
编辑
Can we select the only latest records? For example, I have multiple data being push to topic for COL1=0, but I only want to grab the latest one because its the newest data and it is the only correct one. something like where rowtime = max?
您描述的是 TABLE
:给定键的最新值。
CREATE TABLE FOO AS
SELECT COL1,
LATEST_BY_OFFSET(COL2) AS COL2
FROM FOO_02
WHERE COL1=0
GROUP BY COL1;
结果 table 将有一个 COL1
条目,随着收到新消息,最新值为 COL2
。
Any way to bring the old data in the Stream into the table as well?
为了处理现有数据,也要在 运行 执行 CREATE
语句
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE FOO AS
[…]