KSQL - INSERT INTO a Stream 不产生任何数据
KSQL - INSERT INTO a Stream yields no data
我在从使用 INSERT INTO
KSQL 操作填充的流中读取消息时遇到问题。
我遵循的步骤是:
我有一个从 kafka 主题创建的流 event_stream
。
CREATE STREAM event_stream (eventType varchar, eventTime varchar,
sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
SELECT * FROM event_stream;
显示消息正确进入。
我想将其中一些消息发送到我已经创建的 kafka 中的另一个主题 output_events
。
然后我在 KSQL 中创建第二个流:
CREATE STREAM output_stream (eventTime varchar, extraColumn varchar,
sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
最后,我link输入输出如下:
INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn,
sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
以上所有内容似乎都没有错误地完成,但是如果我 运行
SELECT * FROM output_stream;
我没有得到任何数据。这是为什么?
运行 上面查询的 SELECT 部分工作正常,所以我可以看到匹配结果到达主题。
奇怪的是,如果我 运行 DESCRIBE EXTENDED output_stream
消息计数确实表明消息正在到达流:
Local runtime statistics
------------------------
messages-per-sec: 0.33 total-messages: 86 last-message: 11/9/18 1:15:43 PM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)
我也检查了 ksql-server 日志,但在那里看不到任何错误。
这是 a bug,由于无意中误用了错误语法中的 CREATE STREAM
。您正在针对现有主题使用 'register' KSQL 流的变体。要使 INSERT INTO
起作用,它需要是 CREATE STREAM target AS SELECT
("CSAS")。
让我们解决这个问题。在这里,我使用 this docker-compose 进行测试设置。
填充一些虚拟数据:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t events -P <<EOF
{"eventType":"1", "eventTime" :"2018-11-13-06:34:57", "sourceHostName":"asgard"}
{"eventType":"2", "eventTime" :"2018-11-13-06:35:57", "sourceHostName":"asgard"}
{"eventType":"MatchingValue", "eventTime" :"2018-11-13-06:35:58", "sourceHostName":"asgard"}
EOF
使用 KSQL 注册源主题:
CREATE STREAM event_stream (eventType varchar, eventTime varchar, sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
查询流:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM event_stream;
1542091084660 | null | 1 | 2018-11-13-06:34:57 | asgard
1542091084660 | null | 2 | 2018-11-13-06:35:57 | asgard
1542091785207 | null | MatchingValue | 2018-11-13-06:35:58 | asgard
所以看看你引用的CREATE STREAM
:
CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
我的猜测是,如果你 运行 LIST TOPICS;
你会看到这个主题已经存在于你的 Kafka 代理上?
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
docker-connect-configs | false | 1 | 1 | 0 | 0
docker-connect-offsets | false | 25 | 1 | 0 | 0
docker-connect-status | false | 5 | 1 | 0 | 0
events | true | 1 | 1 | 0 | 0
output_events | false | 4 | 1 | 0 | 0
----------------------------------------------------------------------------------------------------
ksql>
因为如果不是,这个 CREATE STREAM
就会失败:
ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Kafka topic does not exist: output_events
ksql>
所以做出这个假设我也在我的测试集群上创建这个主题:
$ docker-compose exec kafka bash -c "kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 4 --topic output_events"
然后创建流:
ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Message
----------------
Stream created
----------------
请注意,它说的是 Stream created
,而不是 Stream created and running
现在 运行 INSERT INTO
:
ksql> INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
Message
-------------------------------
Insert Into query is running.
-------------------------------
如您所见,DESCRIBE EXTENDED
输出确实显示正在处理的消息:
ksql> DESCRIBE EXTENDED output_stream;
Name : OUTPUT_STREAM
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : output_events (partitions: 4, replication: 1)
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
EVENTTIME | VARCHAR(STRING)
EXTRACOLUMN | VARCHAR(STRING)
SOURCEHOSTNAME | VARCHAR(STRING)
--------------------------------------------
Queries that write into this STREAM
-----------------------------------
InsertQuery_0 : INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.01 total-messages: 1 last-message: 11/13/18 6:49:46 AM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)
但是主题本身没有消息:
ksql> print 'output_events' from beginning;
^C
也不是 KSQL 流:
ksql> SELECT * FROM OUTPUT_STREAM;
^CQuery terminated
因此 INSERT INTO
命令旨在 运行 针对 现有 CSAS/CTAS 目标流,而不是 针对现有主题注册的源 STREAM。
我们换个方式试试吧。首先,我们需要删除现有的流定义,为此还要终止 INSERT INTO
查询:
ksql> DROP STREAM OUTPUT_STREAM;
Cannot drop OUTPUT_STREAM.
The following queries read from this source: [].
The following queries write into this source: [InsertQuery_0].
You need to terminate them before dropping OUTPUT_STREAM.
ksql> TERMINATE InsertQuery_0;
Message
-------------------
Query terminated.
-------------------
ksql> DROP STREAM OUTPUT_STREAM;
Message
------------------------------------
Source OUTPUT_STREAM was dropped.
------------------------------------
现在创建目标流:
ksql> CREATE STREAM output_stream WITH (kafka_topic='output_events') AS SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
Message
----------------------------
Stream created and running
----------------------------
请注意,在创建流时它也是 running
(对比之前只是 created
)。现在查询流:
ksql> SELECT * FROM OUTPUT_STREAM;
1542091785207 | null | 2018-11-13-06:35:58 | Extra info | asgard
并检查基础主题:
ksql> PRINT 'output_events' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542091785207,"ROWKEY":"null","EVENTTIME":"2018-11-13-06:35:58","EXTRACOLUMN":"Extra info","SOURCEHOSTNAME":"asgard"}
所以,您在 KSQL (raised here) 中遇到了一个错误,但幸运的是,可以通过完全使用更简单的 KSQL 语法,结合您的 CREATE STREAM
和 INSERT INTO
来避免这个错误查询合二为一。
我在从使用 INSERT INTO
KSQL 操作填充的流中读取消息时遇到问题。
我遵循的步骤是:
我有一个从 kafka 主题创建的流 event_stream
。
CREATE STREAM event_stream (eventType varchar, eventTime varchar,
sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
SELECT * FROM event_stream;
显示消息正确进入。
我想将其中一些消息发送到我已经创建的 kafka 中的另一个主题 output_events
。
然后我在 KSQL 中创建第二个流:
CREATE STREAM output_stream (eventTime varchar, extraColumn varchar,
sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
最后,我link输入输出如下:
INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn,
sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
以上所有内容似乎都没有错误地完成,但是如果我 运行
SELECT * FROM output_stream;
我没有得到任何数据。这是为什么?
运行 上面查询的 SELECT 部分工作正常,所以我可以看到匹配结果到达主题。
奇怪的是,如果我 运行 DESCRIBE EXTENDED output_stream
消息计数确实表明消息正在到达流:
Local runtime statistics
------------------------
messages-per-sec: 0.33 total-messages: 86 last-message: 11/9/18 1:15:43 PM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)
我也检查了 ksql-server 日志,但在那里看不到任何错误。
这是 a bug,由于无意中误用了错误语法中的 CREATE STREAM
。您正在针对现有主题使用 'register' KSQL 流的变体。要使 INSERT INTO
起作用,它需要是 CREATE STREAM target AS SELECT
("CSAS")。
让我们解决这个问题。在这里,我使用 this docker-compose 进行测试设置。
填充一些虚拟数据:
docker run --rm --interactive --network cos_default confluentinc/cp-kafkacat kafkacat -b kafka:29092 -t events -P <<EOF
{"eventType":"1", "eventTime" :"2018-11-13-06:34:57", "sourceHostName":"asgard"}
{"eventType":"2", "eventTime" :"2018-11-13-06:35:57", "sourceHostName":"asgard"}
{"eventType":"MatchingValue", "eventTime" :"2018-11-13-06:35:58", "sourceHostName":"asgard"}
EOF
使用 KSQL 注册源主题:
CREATE STREAM event_stream (eventType varchar, eventTime varchar, sourceHostName varchar) WITH (kafka_topic='events', value_format='json');
查询流:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM event_stream;
1542091084660 | null | 1 | 2018-11-13-06:34:57 | asgard
1542091084660 | null | 2 | 2018-11-13-06:35:57 | asgard
1542091785207 | null | MatchingValue | 2018-11-13-06:35:58 | asgard
所以看看你引用的CREATE STREAM
:
CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
我的猜测是,如果你 运行 LIST TOPICS;
你会看到这个主题已经存在于你的 Kafka 代理上?
ksql> LIST TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
docker-connect-configs | false | 1 | 1 | 0 | 0
docker-connect-offsets | false | 25 | 1 | 0 | 0
docker-connect-status | false | 5 | 1 | 0 | 0
events | true | 1 | 1 | 0 | 0
output_events | false | 4 | 1 | 0 | 0
----------------------------------------------------------------------------------------------------
ksql>
因为如果不是,这个 CREATE STREAM
就会失败:
ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Kafka topic does not exist: output_events
ksql>
所以做出这个假设我也在我的测试集群上创建这个主题:
$ docker-compose exec kafka bash -c "kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 4 --topic output_events"
然后创建流:
ksql> CREATE STREAM output_stream (eventTime varchar, extraColumn varchar, sourceHostName varchar) WITH (kafka_topic='output_events', value_format='json');
Message
----------------
Stream created
----------------
请注意,它说的是 Stream created
,而不是 Stream created and running
现在 运行 INSERT INTO
:
ksql> INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
Message
-------------------------------
Insert Into query is running.
-------------------------------
如您所见,DESCRIBE EXTENDED
输出确实显示正在处理的消息:
ksql> DESCRIBE EXTENDED output_stream;
Name : OUTPUT_STREAM
Type : STREAM
Key field :
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : output_events (partitions: 4, replication: 1)
Field | Type
--------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
EVENTTIME | VARCHAR(STRING)
EXTRACOLUMN | VARCHAR(STRING)
SOURCEHOSTNAME | VARCHAR(STRING)
--------------------------------------------
Queries that write into this STREAM
-----------------------------------
InsertQuery_0 : INSERT INTO output_stream SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0.01 total-messages: 1 last-message: 11/13/18 6:49:46 AM UTC
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic output_events)
但是主题本身没有消息:
ksql> print 'output_events' from beginning;
^C
也不是 KSQL 流:
ksql> SELECT * FROM OUTPUT_STREAM;
^CQuery terminated
因此 INSERT INTO
命令旨在 运行 针对 现有 CSAS/CTAS 目标流,而不是 针对现有主题注册的源 STREAM。
我们换个方式试试吧。首先,我们需要删除现有的流定义,为此还要终止 INSERT INTO
查询:
ksql> DROP STREAM OUTPUT_STREAM;
Cannot drop OUTPUT_STREAM.
The following queries read from this source: [].
The following queries write into this source: [InsertQuery_0].
You need to terminate them before dropping OUTPUT_STREAM.
ksql> TERMINATE InsertQuery_0;
Message
-------------------
Query terminated.
-------------------
ksql> DROP STREAM OUTPUT_STREAM;
Message
------------------------------------
Source OUTPUT_STREAM was dropped.
------------------------------------
现在创建目标流:
ksql> CREATE STREAM output_stream WITH (kafka_topic='output_events') AS SELECT eventTime, 'Extra info' as extraColumn, sourceHostName FROM event_stream WHERE eventType = 'MatchingValue';
Message
----------------------------
Stream created and running
----------------------------
请注意,在创建流时它也是 running
(对比之前只是 created
)。现在查询流:
ksql> SELECT * FROM OUTPUT_STREAM;
1542091785207 | null | 2018-11-13-06:35:58 | Extra info | asgard
并检查基础主题:
ksql> PRINT 'output_events' FROM BEGINNING;
Format:JSON
{"ROWTIME":1542091785207,"ROWKEY":"null","EVENTTIME":"2018-11-13-06:35:58","EXTRACOLUMN":"Extra info","SOURCEHOSTNAME":"asgard"}
所以,您在 KSQL (raised here) 中遇到了一个错误,但幸运的是,可以通过完全使用更简单的 KSQL 语法,结合您的 CREATE STREAM
和 INSERT INTO
来避免这个错误查询合二为一。