无法从 table KSQL 获取数据

cannot get data from table KSQL

我创建了一个重新加密的流

CREATE STREAM details_stream_rekeyed2 as \
  select CONCAT(IdSeq,IdTime,'') as root,ServerId,Server,\
         IdTime ,IdSeq \
  from voip_details_stream  \
  partition by root;

select 从此流中,我得到 > 100 个项目

然后我尝试创建一个 table

create table voip_details_table3 \
  (ROOT varchar,ServerId long , Server varchar ,IdTime varchar,IdSeq long ) \
  with ( kafka_topic = 'DETAILS_STREAM_REKEYED2', \
         value_format = 'json',\
         key='ROOT');

当我 运行

SELECT ROWKEY,ROOT  FROM VOIP_DETAILS_TABLE3;

我只得到不到 10 个赞;

12018-04-04T18:56:35.080-04:00 | 12018-04-04T18:56:35.080-04:00

i 运行 命令:

kafkacat -C -K: -b "$BROKER_LIST" -f 'Key:    %k\nKey Bytes: %K\nValue:  %s\nValue Bytes: %S\n\n' -t DETAILS_STREAM_REKEYED2

一切都好。我得到了喜欢的数据

 Key:    12018-02-05T15:16:07.113-05:00
 Key Bytes: 30
 Value:  {"SERVER":null,"IDSEQ":1,"ROOT":"12018-02-05T15:16:07.113-05:00","SERVERID":null,"SESSIONIDTIME":"2018-02-05T15:16:07.113-05:00"}

值字节:158

哪里出了问题?

KSQL table 不同于 KSQL 流,因为它为您提供给定键最新值 .因此,如果您希望在 table 中看到与源流中相同数量的消息,那么您应该具有相同数量的唯一键。

如果您看到的消息较少,则表明 ROOT 不是唯一的。

根据您建模的问题,您应该:

  • (a) 使用流而不是 Table,或
  • (b) 更改您正在使用的密钥

参考: