KSQL 从最新数据流中创建 table
KSQL create table from stream for latest data
我有一个名为客户的主题,我已经为其创建了一个流
CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');
我的 customers
主题制作人正在生成一个整数键和一个 json 值。但是当我看到行键被设置为某个二进制值时
ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}
现在,如果我创建一个 table,它会生成一行(可能是因为行键相同??)
CREATE TABLE customers (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');
在网上搜索后我偶然发现了这篇文章https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key并通过在键上重新分区创建了一个新流
CREATE STREAM customers_stream2 AS \
SELECT * FROM customers_stream \
PARTITION BY customerId;
那么如何创建一个具有最新客户数据值的 table?
从流创建 table 导致错误
CREATE TABLE customers_2_table_active AS
SELECT CUSTOMERID,ISACTIVE
FROM customers_stream2;
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
我需要各行的最新值,以便另一个微服务可以查询新的 table。
提前致谢
重新键入似乎是正确的方法,但是,您不能将 STREAM
直接转换为 TABLE
。
请注意,您重新输入密钥的流 customers_stream2
已写入相应的主题。因此,您应该能够从流的主题创建一个新的 TABLE
以获得每个键的最新值。
我有一个名为客户的主题,我已经为其创建了一个流
CREATE STREAM customers_stream (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='json');
我的 customers
主题制作人正在生成一个整数键和一个 json 值。但是当我看到行键被设置为某个二进制值时
ksql> print 'customers';
Format:JSON
{"ROWTIME":1570305904984,"ROWKEY":"\u0000\u0000\u0003�","customerId":1001,"isActive":true}
{"ROWTIME":1570307584257,"ROWKEY":"\u0000\u0000\u0003�","customerId":1002,"isActive":true}
现在,如果我创建一个 table,它会生成一行(可能是因为行键相同??)
CREATE TABLE customers (customerId INT, isActive BOOLEAN)
WITH (KAFKA_TOPIC='customers', KEY='customerId',VALUE_FORMAT='json');
在网上搜索后我偶然发现了这篇文章https://www.confluent.io/stream-processing-cookbook/ksql-recipes/setting-kafka-message-key并通过在键上重新分区创建了一个新流
CREATE STREAM customers_stream2 AS \
SELECT * FROM customers_stream \
PARTITION BY customerId;
那么如何创建一个具有最新客户数据值的 table?
从流创建 table 导致错误
CREATE TABLE customers_2_table_active AS
SELECT CUSTOMERID,ISACTIVE
FROM customers_stream2;
Invalid result type. Your SELECT query produces a STREAM. Please use CREATE STREAM AS SELECT statement instead.
我需要各行的最新值,以便另一个微服务可以查询新的 table。
提前致谢
重新键入似乎是正确的方法,但是,您不能将 STREAM
直接转换为 TABLE
。
请注意,您重新输入密钥的流 customers_stream2
已写入相应的主题。因此,您应该能够从流的主题创建一个新的 TABLE
以获得每个键的最新值。