如何在 KSQL 中正确创建基于 table 的输出流(变更日志)?
How to create an output stream (changelog) based on a table in KSQL correctly?
第 1 步:创建 table
我目前在 KSQL 中有一个由
创建的 table
CREATE TABLE cdc_window_table
WITH (KAFKA_TOPIC='cdc_stream',
VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
此时,它创建了一个新的table。我可以通过
查看
SELECT *
FROM cdc_window_table
EMIT CHANGES;
哪些returns数据像
+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART |WINDOWEND |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1 |1648767460000 |1648767480000 |1 |
|a1 |1648767460000 |1648767480000 |2 |
|a1 |1648767460000 |1648767480000 |3 |
|a1 |1648767480000 |1648767500000 |1 |
|a1 |1648767740000 |1648767760000 |1 |
第 2 步:创建输出流(变更日志)- 失败
我正在尝试基于此 table 创建输出流(变更日志),如下图所示:
(图片来源:https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
看完,我尝试了这4种方法:
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
application_id_count INT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='TUMBLING',
WINDOW_SIZE='20 SECONDS');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
当我查看时
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
它只显示 table header 没有任何更新日志数据:
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
根据 table 创建输出流(更新日志)的正确方法是什么?
在第 2 步中,我应该使用类似 _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition
.
的主题,而不是使用主题 cdc_window_table
这个 table 的更新日志主题是在我创建之前的 table.
时由 KSQL 自动创建的
您可以使用
找到这个长的更新日志主题名称
show all topics;
(注意上面的all
。没有它,更新日志主题将不会列出。)
工作的 KSQL 将是
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
VALUE_FORMAT='JSON');
(注意上面 application_id STRING
后面的 KEY
。如果没有 KEY
,application_id
将在流中显示为 null
。)
当我查看时
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
此时,我可以看到
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
|a1 |null |
|a1 |null |
|a1 |null |
我不确定为什么 application_id_count
是 null
,但对我来说,application_id
是我在用例中所关心的。如果我找到解决方案,或者有人知道,我会更新这个答案。
第 1 步:创建 table
我目前在 KSQL 中有一个由
创建的 tableCREATE TABLE cdc_window_table
WITH (KAFKA_TOPIC='cdc_stream',
VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;
此时,它创建了一个新的table。我可以通过
查看SELECT *
FROM cdc_window_table
EMIT CHANGES;
哪些returns数据像
+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART |WINDOWEND |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1 |1648767460000 |1648767480000 |1 |
|a1 |1648767460000 |1648767480000 |2 |
|a1 |1648767460000 |1648767480000 |3 |
|a1 |1648767480000 |1648767500000 |1 |
|a1 |1648767740000 |1648767760000 |1 |
第 2 步:创建输出流(变更日志)- 失败
我正在尝试基于此 table 创建输出流(变更日志),如下图所示:
(图片来源:https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/)
看完
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
application_id_count INT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='TUMBLING',
WINDOW_SIZE='20 SECONDS');
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='cdc_window_table',
VALUE_FORMAT='JSON',
WINDOW_TYPE='SESSION');
当我查看时
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
它只显示 table header 没有任何更新日志数据:
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
根据 table 创建输出流(更新日志)的正确方法是什么?
在第 2 步中,我应该使用类似 _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition
.
cdc_window_table
这个 table 的更新日志主题是在我创建之前的 table.
时由 KSQL 自动创建的您可以使用
找到这个长的更新日志主题名称show all topics;
(注意上面的all
。没有它,更新日志主题将不会列出。)
工作的 KSQL 将是
CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
application_id_count BIGINT)
WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
VALUE_FORMAT='JSON');
(注意上面 application_id STRING
后面的 KEY
。如果没有 KEY
,application_id
将在流中显示为 null
。)
当我查看时
SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;
此时,我可以看到
+------------------+-----------------------+
|APPLICATION_ID |APPLICATION_ID_COUNT |
+------------------+-----------------------+
|a1 |null |
|a1 |null |
|a1 |null |
我不确定为什么 application_id_count
是 null
,但对我来说,application_id
是我在用例中所关心的。如果我找到解决方案,或者有人知道,我会更新这个答案。