KSQL 创建 table 为 select

KSQL create table as select

CREATE TABLE AS SELECT 只能通过分组来完成,但是如何在 Base 中组织一个 table(你需要一个简单的 table 而不聚合/分组)以便用数据补充另一个流? 举个例子: 有一个带有字段 (product, city_id) 的流 A。 我们需要一个 table(或其他东西),其中包含由另一个线程补充的字段 (city_id、city_name)。

并且有一个流将补充流 A 与 table 中的名称连接起来。

如何使用外部目录组织数据丰富?

您可以使用 LATEST_BY_OFFSET 聚合以这种方式构建 table 数据。

CREATE STREAM source_city_data 
  WITH (KAFKA_TOPIC='source_city_data', FORMAT='AVRO');

CREATE TABLE city_data AS 
  SELECT city_id, LATEST_BY_OFFSET(city_name) AS city_name 
  FROM source_city_data;