Kafka ksql 简单连接不起作用
Kafka ksql simple join does not work
我在流中重新输入了数据,table我使用 Confluent 4.1
1) 创建流
CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');
2) 创建重新加密的流,因为这个脚本不起作用,但在此之前它起作用了,为什么?
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by root;
然后我创建下一个脚本 s
CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update partition by root;
session_details_stream_rekeyed 的结果正常:
ksql> select * from session_details_stream_rekeyed;
1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001
3) 为主题创建流;
CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream partition by SessionIdTime;
CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update partition by root;
ksql> select * from voip_details_stream_rekeyed6;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001
4) 创建一个 table
CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');
ksql> select * from voip_details_table;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001
5) 然后我创建一个左连接
select c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root = u.root;
1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null
问题出在哪里?
tl;dr 进行流-table 加入时,您的 table 消息必须已经存在(并且必须带有时间戳)在 流消息之前。如果您重新发送源流消息,在填充 table 主题后,加入将成功。
示例数据
使用 kafkacat
填充主题(将数据粘贴到 stdin
)
cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs
cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs
验证主题内容:
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
声明源流
ksql> CREATE STREAM session_details_stream \
(Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
(SessionIdTime varchar,SessionIdSeq long, Details varchar) \
WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated
在 SessionIdTime+SessionIdSeq 上对每个主题重新分区
ksql> CREATE STREAM SESSION AS \
SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
FROM session_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo
ksql> CREATE STREAM VOIP AS \
SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
FROM voip_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql>
声明table
ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');
Message
---------------
Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b
将 SESSION 流加入 VOIP table
ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
FROM SESSION s \
LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null
保留上面的 JOIN 查询 运行。将 SESSION 消息重新发送到源主题(使用 kafkacat
将相同的消息发送到 sessionDetails
,如上):
1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
Per Rohan Desai 在 Confluent Community Slack:
The problem is that the rowtime of the record from your stream is earlier than the rowtime of the record in your table that you expect it to join with. So when the stream record is processed there is no corresponding record in the table
查看源 table 上的消息,使用 ROWTIME
查看消息时间戳( 不要与基于时间戳的连接键混淆root
):
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2
将此与源会话流主题上的消息进行比较:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo
其中的 first(在 11:32:10
/ 1526553130865
)先于相应的 VOIP
消息(如上所示) , 并产生了我们第一次看到的 null
连接结果。这些second之后的日期(11:46:28
/1526553988639
)产生我们随后看到的成功连接:
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
我在流中重新输入了数据,table我使用 Confluent 4.1
1) 创建流
CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');
2) 创建重新加密的流,因为这个脚本不起作用,但在此之前它起作用了,为什么?
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by root;
然后我创建下一个脚本 s
CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update partition by root;
session_details_stream_rekeyed 的结果正常:
ksql> select * from session_details_stream_rekeyed;
1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001
3) 为主题创建流;
CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream partition by SessionIdTime;
CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update partition by root;
ksql> select * from voip_details_stream_rekeyed6;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001
4) 创建一个 table
CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');
ksql> select * from voip_details_table;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001
5) 然后我创建一个左连接
select c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root = u.root;
1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null
问题出在哪里?
tl;dr 进行流-table 加入时,您的 table 消息必须已经存在(并且必须带有时间戳)在 流消息之前。如果您重新发送源流消息,在填充 table 主题后,加入将成功。
示例数据
使用 kafkacat
填充主题(将数据粘贴到 stdin
)
cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs
cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs
验证主题内容:
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
声明源流
ksql> CREATE STREAM session_details_stream \
(Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
(SessionIdTime varchar,SessionIdSeq long, Details varchar) \
WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');
Message
----------------
Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated
在 SessionIdTime+SessionIdSeq 上对每个主题重新分区
ksql> CREATE STREAM SESSION AS \
SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
FROM session_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo
ksql> CREATE STREAM VOIP AS \
SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
FROM voip_details_stream \
PARTITION BY root;
Message
----------------------------
Stream created and running
----------------------------
ksql>
声明table
ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');
Message
---------------
Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b
将 SESSION 流加入 VOIP table
ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
FROM SESSION s \
LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null
保留上面的 JOIN 查询 运行。将 SESSION 消息重新发送到源主题(使用 kafkacat
将相同的消息发送到 sessionDetails
,如上):
1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2
Per Rohan Desai 在 Confluent Community Slack:
The problem is that the rowtime of the record from your stream is earlier than the rowtime of the record in your table that you expect it to join with. So when the stream record is processed there is no corresponding record in the table
查看源 table 上的消息,使用 ROWTIME
查看消息时间戳( 不要与基于时间戳的连接键混淆root
):
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2
将此与源会话流主题上的消息进行比较:
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo
其中的 first(在 11:32:10
/ 1526553130865
)先于相应的 VOIP
消息(如上所示) , 并产生了我们第一次看到的 null
连接结果。这些second之后的日期(11:46:28
/1526553988639
)产生我们随后看到的成功连接:
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2