Kafka ksql 简单连接不起作用

Kafka ksql simple join does not work

我在流中重新输入了数据,table我使用 Confluent 4.1

1) 创建流

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2) 创建重新加密的流,因为这个脚本不起作用,但在此之前它起作用了,为什么?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

然后我创建下一个脚本 s

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;

session_details_stream_rekeyed 的结果正常:

ksql> select * from session_details_stream_rekeyed;
      1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3) 为主题创建流;

 CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
 CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream  partition by SessionIdTime;
 CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update  partition by root;



 ksql> select * from voip_details_stream_rekeyed6;
      1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4) 创建一个 table

 CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');

 ksql> select * from voip_details_table;

       1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5) 然后我创建一个左连接

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;

   1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

问题出在哪里?

tl;dr 进行流-table 加入时,您的 table 消息必须已经存在(并且必须带有时间戳) 流消息之前。如果您重新发送源流消息,在填充 table 主题后,加入将成功。

示例数据

使用 kafkacat 填充主题(将数据粘贴到 stdin

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs


cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

验证主题内容:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

声明源流

ksql> CREATE STREAM session_details_stream \
      (Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
      WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
      (SessionIdTime varchar,SessionIdSeq long, Details varchar) \
      WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');

 Message
----------------
 Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

在 SessionIdTime+SessionIdSeq 上对每个主题重新分区

ksql> CREATE STREAM SESSION AS \
      SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
      FROM session_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------


ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo


ksql> CREATE STREAM VOIP AS \
      SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
      FROM voip_details_stream \
      PARTITION BY root;

 Message
----------------------------
 Stream created and running
----------------------------
ksql>

声明table

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
      WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');

 Message
---------------
 Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

将 SESSION 流加入 VOIP table

ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
      FROM SESSION s \
      LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

保留上面的 JOIN 查询 运行。将 SESSION 消息重新发送到源主题(使用 kafkacat 将相同的消息发送到 sessionDetails,如上):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Per Rohan Desai 在 Confluent Community Slack:

The problem is that the rowtime of the record from your stream is earlier than the rowtime of the record in your table that you expect it to join with. So when the stream record is processed there is no corresponding record in the table

查看源 table 上的消息,使用 ROWTIME 查看消息时间戳( 不要与基于时间戳的连接键混淆root):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

将此与源会话流主题上的消息进行比较:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

其中的 first(在 11:32:10 / 1526553130865)先于相应的 VOIP 消息(如上所示) , 并产生了我们第一次看到的 null 连接结果。这些second之后的日期(11:46:28/1526553988639)产生我们随后看到的成功连接:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2