如何在 KSQL 中正确创建基于 table 的输出流(变更日志)?

How to create an output stream (changelog) based on a table in KSQL correctly?

第 1 步:创建 table

我目前在 KSQL 中有一个由

创建的 table
CREATE TABLE cdc_window_table
    WITH (KAFKA_TOPIC='cdc_stream',
          VALUE_FORMAT='JSON') AS
SELECT after->application_id AS application_id,
       COUNT(*) AS application_id_count
FROM cdc_stream
WINDOW TUMBLING (SIZE 20 SECONDS)
GROUP BY after->application_id
EMIT CHANGES;

此时,它创建了一个新的table。我可以通过

查看
SELECT *
FROM cdc_window_table
EMIT CHANGES;

哪些returns数据像

+---------------+---------------+---------------+---------------------+
|APPLICATION_ID |WINDOWSTART    |WINDOWEND      |APPLICATION_ID_COUNT |
+---------------+---------------+---------------+---------------------+
|a1             |1648767460000  |1648767480000  |1                    |
|a1             |1648767460000  |1648767480000  |2                    |
|a1             |1648767460000  |1648767480000  |3                    |
|a1             |1648767480000  |1648767500000  |1                    |
|a1             |1648767740000  |1648767760000  |1                    |

第 2 步:创建输出流(变更日志)- 失败

我正在尝试基于此 table 创建输出流(变更日志),如下图所示:

(图片来源:https://www.confluent.io/blog/kafka-streams-tables-part-1-event-streaming/

看完,我尝试了这4种方法:

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING,
                                                 application_id_count INT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='TUMBLING',
         WINDOW_SIZE='20 SECONDS');

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

CREATE STREAM cdc_window_table_changelog_stream (ROWKEY STRING KEY,
                                                 application_id_count BIGINT) 
   WITH (KAFKA_TOPIC='cdc_window_table',
         VALUE_FORMAT='JSON',
         WINDOW_TYPE='SESSION');

当我查看时

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

它只显示 table header 没有任何更新日志数据:

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+

根据 table 创建输出流(更新日志)的正确方法是什么?

在第 2 步中,我应该使用类似 _confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition.

的主题,而不是使用主题 cdc_window_table

这个 table 的更新日志主题是在我创建之前的 table.

时由 KSQL 自动创建的

您可以使用

找到这个长的更新日志主题名称
show all topics;

(注意上面的all。没有它,更新日志主题将不会列出。)

工作的 KSQL 将是

CREATE STREAM cdc_window_table_changelog_stream (application_id STRING KEY,
                                                 application_id_count BIGINT)
  WITH (KAFKA_TOPIC='_confluent-ksql-xxx-ksqlquery_CTAS_CDC_WINDOW_TABLE_271-Aggregate-GroupBy-repartition',
        VALUE_FORMAT='JSON');

(注意上面 application_id STRING 后面的 KEY。如果没有 KEYapplication_id 将在流中显示为 null。)

当我查看时

SELECT *
FROM cdc_window_table_changelog_stream
EMIT CHANGES;

此时,我可以看到

+------------------+-----------------------+
|APPLICATION_ID    |APPLICATION_ID_COUNT   |
+------------------+-----------------------+
|a1                |null                   |
|a1                |null                   |
|a1                |null                   |

我不确定为什么 application_id_countnull,但对我来说,application_id 是我在用例中所关心的。如果我找到解决方案,或者有人知道,我会更新这个答案。