KSQL 从最新数据流中创建 table

KSQL create table from stream for latest data

我有一个名为客户的主题,我已经为其创建了一个流

CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');

我的 customers 主题制作人正在生成一个整数键和一个 json 值。但是当我看到行键被设置为某个二进制值时

ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}

现在,如果我创建一个 table,它会生成一行(可能是因为行键相同??)

CREATE TABLE customers (customerId INT, isActive BOOLEAN)
  WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');

在网上搜索后我偶然发现了这篇文章https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key并通过在键上重新分区创建了一个新流

CREATE STREAM customers_stream2 AS \
 SELECT * FROM customers_stream \
 PARTITION BY customerId;

那么如何创建一个具有最新客户数据值的 table?

从流创建 table 导致错误

CREATE TABLE customers_2_table_active AS
  SELECT CUSTOMERID,ISACTIVE
  FROM customers_stream2;

Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.

我需要各行的最新值,以便另一个微服务可以查询新的 table。

提前致谢

重新键入似乎是正确的方法,但是,您不能将 STREAM 直接转换为 TABLE

请注意,您重新输入密钥的流 customers_stream2 已写入相应的主题。因此,您应该能够从流的主题创建一个新的 TABLE 以获得每个键的最新值。