如何将主题中 json 的 select 子值作为 ksql 流
How to select subvalue of json in topic as ksql stream
我在 kafka 中有很多主题,格式如下:
value: {big json string with many subkeys etc}.
打印主题如下:
rowtime: 3/10/20 7:10:43 AM UTC, key: , value: {"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
我试过使用
CREATE STREAM test
(value STRUCT<
server_name VARCHAR,
remote_address VARCHAR,
forwarded_for VARCHAR,
remote_user VARCHAR,
timestamp_start VARCHAR
..
WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');
但是我得到一个值为 NULL 的流。
有没有办法在value键下抓取?
转义的 JSON 无效 JSON,这可能会使这变得更加困难 :)
在此代码段中:
…\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\…
o_name
的前导双引号未转义。您可以使用 jq
之类的东西来验证这一点:
echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
parse error: Invalid numeric literal at line 1, column 685
在 JSON 修复后解析成功:
➜ echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_m
ethod\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_
wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddd
dddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
{
"server_name": "xxxxxxxxxxxxxxx",
"remote_address": "10.x.x.x",
"user": "xxxxxx",
"timestamp_start": "xxxxxxxx",
"timestamp_finish": "xxxxxxxxxx",
"time_start": "10/Mar/2020:07:10:39 +0000",
"time_finish": "10/Mar/2020:07:10:39 +0000",
"request_method": "PUT",
"request_uri": "xxxxxxxxxxxxxxxxxxxxxxx",
"protocol": "HTTP/1.1",
"status": 200,
…
所以现在让我们把它放到 ksqlDB 中。我正在使用 kafkacat 将其加载到主题中:
kafkacat -b localhost:9092 -t testing -P<<EOF
{ "@timestamp": "XXXXXXXX", "beat": { "hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2.1" }, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
EOF
现在使用 ksqlDB 让我们声明大纲模式,其中 message
字段只是 VARCHAR
的一个块:
CREATE STREAM TEST (BEAT STRUCT<HOSTNAME VARCHAR, NAME VARCHAR, VERSION VARCHAR>,
INPUT_TYPE VARCHAR,
MESSAGE VARCHAR,
OFFSET BIGINT,
SOURCE VARCHAR)
WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');
我们可以查询此流以检查它是否正常工作:
SET 'auto.offset.reset' = 'earliest';
SELECT BEAT->HOSTNAME,
BEAT->VERSION,
SOURCE,
MESSAGE
FROM TEST
EMIT CHANGES LIMIT 1;
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|BEAT__HOSTNAME |BEAT__VERSION |SOURCE |MESSAGE |
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|xxxxxxxxxx |5.2.1 |/var/log/xxxx.log |{"server_name":"xxxxxxxxxxxxxxx","remote_address":"10.x.x.x","user":|
| | | |"xxxxxx","timestamp_start":"xxxxxxxx","timestamp_finish":"xxxxxxxxxx|
| | | |","time_start":"10/Mar/2020:07:10:39 +0000","time_finish":"10/Mar/20|
| | | |20:07:10:39 +0000","request_method":"PUT","request_uri":"xxxxxxxxxxx|
| | | |xxxxxxxxxxxx","protocol":"HTTP/1.1","status":200,"response_length":"|
| | | |0","request_length":"0","user_agent":"xxxxxxxxx","request_id":"zzzzz|
| | | |zzzzzzzzzzzzzzzz","request_type":"zzzzzzzz","stat":{"c_wait":0.004,"|
| | | |s_wait":0.432,"digest":0.0,"commit":31.878,"turn_around_time":0.0,"t|
| | | |_transfer":32.319},"object_length":"0","o_name":"xxxxx","https":{"pr|
| | | |otocol":"TLSv1.2","cipher_suite":"TLS_RSA_WITH_AES_256_GCM_SHA384"},|
| | | |"principals":{"identity":"zzzzzz","asv":"dddddddddd"},"type":"http",|
| | | |"format":1} |
Limit Reached
Query terminated
现在让我们使用 EXTRACTJSONFIELD
函数提取嵌入的 JSON 字段(我没有完成每个字段,只是其中的一小部分来说明要遵循的模式):
SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address') AS REMOTE_ADDRESS,
EXTRACTJSONFIELD(MESSAGE,'$.time_start') AS TIME_START,
EXTRACTJSONFIELD(MESSAGE,'$.protocol') AS PROTOCOL,
EXTRACTJSONFIELD(MESSAGE,'$.status') AS STATUS,
EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait') AS STAT_C_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait') AS STAT_S_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.digest') AS STAT_DIGEST,
EXTRACTJSONFIELD(MESSAGE,'$.stat.commit') AS STAT_COMMIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer') AS STAT_T_TRANSFER
FROM TEST
EMIT CHANGES LIMIT 1;
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|REMOTE_ADDRESS |TIME_START |PROTOCOL |STATUS |STAT_C_WAIT |STAT_S_WAIT |STAT_DIGEST |STAT_COMMIT |STAT_TURN_AROUND_TIME |STAT_T_TRANSFER |
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|10.x.x.x |10/Mar/2020:07:10:39 +0000|HTTP/1.1 |200 |0.004 |0.432 |0 |31.878 |0 |32.319 |
我们可以将它持久化到一个新的 Kafka 主题中,并且为了更好的测量将它重新序列化到 Avro 以使下游应用程序更容易使用:
CREATE STREAM BEATS WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address') AS REMOTE_ADDRESS,
EXTRACTJSONFIELD(MESSAGE,'$.time_start') AS TIME_START,
EXTRACTJSONFIELD(MESSAGE,'$.protocol') AS PROTOCOL,
EXTRACTJSONFIELD(MESSAGE,'$.status') AS STATUS,
EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait') AS STAT_C_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait') AS STAT_S_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.digest') AS STAT_DIGEST,
EXTRACTJSONFIELD(MESSAGE,'$.stat.commit') AS STAT_COMMIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer') AS STAT_T_TRANSFER
FROM TEST
EMIT CHANGES LIMIT 1;
ksql> DESCRIBE BEATS;
Name : BEATS
Field | Type
---------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
REMOTE_ADDRESS | VARCHAR(STRING)
TIME_START | VARCHAR(STRING)
PROTOCOL | VARCHAR(STRING)
STATUS | VARCHAR(STRING)
STAT_C_WAIT | VARCHAR(STRING)
STAT_S_WAIT | VARCHAR(STRING)
STAT_DIGEST | VARCHAR(STRING)
STAT_COMMIT | VARCHAR(STRING)
STAT_TURN_AROUND_TIME | VARCHAR(STRING)
STAT_T_TRANSFER | VARCHAR(STRING)
---------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
要调试 ksqlDB 返回 NULL 的问题,请查看 this article。很多时候它归结为序列化错误。例如,如果您查看 ksqlDB 服务器日志,当它在我修复它之前尝试解析格式错误的转义 JSON 时,您会看到此错误:
WARN Exception caught during Deserialization, taskId: 0_0, topic: testing, partition: 0, offset: 1 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: mvn value from topic: testing
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('o' (code 111)): was expecting comma to separate Object entries
at [Source: (byte[])"{"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\
"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HT"[truncated 604 bytes];
line: 1, column: 827]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:986)
…
我在 kafka 中有很多主题,格式如下:
value: {big json string with many subkeys etc}.
打印主题如下:
rowtime: 3/10/20 7:10:43 AM UTC, key: , value: {"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
我试过使用
CREATE STREAM test
(value STRUCT<
server_name VARCHAR,
remote_address VARCHAR,
forwarded_for VARCHAR,
remote_user VARCHAR,
timestamp_start VARCHAR
..
WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');
但是我得到一个值为 NULL 的流。 有没有办法在value键下抓取?
转义的 JSON 无效 JSON,这可能会使这变得更加困难 :)
在此代码段中:
…\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\…
o_name
的前导双引号未转义。您可以使用 jq
之类的东西来验证这一点:
echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
parse error: Invalid numeric literal at line 1, column 685
在 JSON 修复后解析成功:
➜ echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_m
ethod\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_
wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddd
dddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
{
"server_name": "xxxxxxxxxxxxxxx",
"remote_address": "10.x.x.x",
"user": "xxxxxx",
"timestamp_start": "xxxxxxxx",
"timestamp_finish": "xxxxxxxxxx",
"time_start": "10/Mar/2020:07:10:39 +0000",
"time_finish": "10/Mar/2020:07:10:39 +0000",
"request_method": "PUT",
"request_uri": "xxxxxxxxxxxxxxxxxxxxxxx",
"protocol": "HTTP/1.1",
"status": 200,
…
所以现在让我们把它放到 ksqlDB 中。我正在使用 kafkacat 将其加载到主题中:
kafkacat -b localhost:9092 -t testing -P<<EOF
{ "@timestamp": "XXXXXXXX", "beat": { "hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2.1" }, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
EOF
现在使用 ksqlDB 让我们声明大纲模式,其中 message
字段只是 VARCHAR
的一个块:
CREATE STREAM TEST (BEAT STRUCT<HOSTNAME VARCHAR, NAME VARCHAR, VERSION VARCHAR>,
INPUT_TYPE VARCHAR,
MESSAGE VARCHAR,
OFFSET BIGINT,
SOURCE VARCHAR)
WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');
我们可以查询此流以检查它是否正常工作:
SET 'auto.offset.reset' = 'earliest';
SELECT BEAT->HOSTNAME,
BEAT->VERSION,
SOURCE,
MESSAGE
FROM TEST
EMIT CHANGES LIMIT 1;
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|BEAT__HOSTNAME |BEAT__VERSION |SOURCE |MESSAGE |
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|xxxxxxxxxx |5.2.1 |/var/log/xxxx.log |{"server_name":"xxxxxxxxxxxxxxx","remote_address":"10.x.x.x","user":|
| | | |"xxxxxx","timestamp_start":"xxxxxxxx","timestamp_finish":"xxxxxxxxxx|
| | | |","time_start":"10/Mar/2020:07:10:39 +0000","time_finish":"10/Mar/20|
| | | |20:07:10:39 +0000","request_method":"PUT","request_uri":"xxxxxxxxxxx|
| | | |xxxxxxxxxxxx","protocol":"HTTP/1.1","status":200,"response_length":"|
| | | |0","request_length":"0","user_agent":"xxxxxxxxx","request_id":"zzzzz|
| | | |zzzzzzzzzzzzzzzz","request_type":"zzzzzzzz","stat":{"c_wait":0.004,"|
| | | |s_wait":0.432,"digest":0.0,"commit":31.878,"turn_around_time":0.0,"t|
| | | |_transfer":32.319},"object_length":"0","o_name":"xxxxx","https":{"pr|
| | | |otocol":"TLSv1.2","cipher_suite":"TLS_RSA_WITH_AES_256_GCM_SHA384"},|
| | | |"principals":{"identity":"zzzzzz","asv":"dddddddddd"},"type":"http",|
| | | |"format":1} |
Limit Reached
Query terminated
现在让我们使用 EXTRACTJSONFIELD
函数提取嵌入的 JSON 字段(我没有完成每个字段,只是其中的一小部分来说明要遵循的模式):
SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address') AS REMOTE_ADDRESS,
EXTRACTJSONFIELD(MESSAGE,'$.time_start') AS TIME_START,
EXTRACTJSONFIELD(MESSAGE,'$.protocol') AS PROTOCOL,
EXTRACTJSONFIELD(MESSAGE,'$.status') AS STATUS,
EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait') AS STAT_C_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait') AS STAT_S_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.digest') AS STAT_DIGEST,
EXTRACTJSONFIELD(MESSAGE,'$.stat.commit') AS STAT_COMMIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer') AS STAT_T_TRANSFER
FROM TEST
EMIT CHANGES LIMIT 1;
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|REMOTE_ADDRESS |TIME_START |PROTOCOL |STATUS |STAT_C_WAIT |STAT_S_WAIT |STAT_DIGEST |STAT_COMMIT |STAT_TURN_AROUND_TIME |STAT_T_TRANSFER |
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|10.x.x.x |10/Mar/2020:07:10:39 +0000|HTTP/1.1 |200 |0.004 |0.432 |0 |31.878 |0 |32.319 |
我们可以将它持久化到一个新的 Kafka 主题中,并且为了更好的测量将它重新序列化到 Avro 以使下游应用程序更容易使用:
CREATE STREAM BEATS WITH (VALUE_FORMAT='AVRO') AS
SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address') AS REMOTE_ADDRESS,
EXTRACTJSONFIELD(MESSAGE,'$.time_start') AS TIME_START,
EXTRACTJSONFIELD(MESSAGE,'$.protocol') AS PROTOCOL,
EXTRACTJSONFIELD(MESSAGE,'$.status') AS STATUS,
EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait') AS STAT_C_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait') AS STAT_S_WAIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.digest') AS STAT_DIGEST,
EXTRACTJSONFIELD(MESSAGE,'$.stat.commit') AS STAT_COMMIT,
EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer') AS STAT_T_TRANSFER
FROM TEST
EMIT CHANGES LIMIT 1;
ksql> DESCRIBE BEATS;
Name : BEATS
Field | Type
---------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
REMOTE_ADDRESS | VARCHAR(STRING)
TIME_START | VARCHAR(STRING)
PROTOCOL | VARCHAR(STRING)
STATUS | VARCHAR(STRING)
STAT_C_WAIT | VARCHAR(STRING)
STAT_S_WAIT | VARCHAR(STRING)
STAT_DIGEST | VARCHAR(STRING)
STAT_COMMIT | VARCHAR(STRING)
STAT_TURN_AROUND_TIME | VARCHAR(STRING)
STAT_T_TRANSFER | VARCHAR(STRING)
---------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
要调试 ksqlDB 返回 NULL 的问题,请查看 this article。很多时候它归结为序列化错误。例如,如果您查看 ksqlDB 服务器日志,当它在我修复它之前尝试解析格式错误的转义 JSON 时,您会看到此错误:
WARN Exception caught during Deserialization, taskId: 0_0, topic: testing, partition: 0, offset: 1 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: mvn value from topic: testing
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('o' (code 111)): was expecting comma to separate Object entries
at [Source: (byte[])"{"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\
"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HT"[truncated 604 bytes];
line: 1, column: 827]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:986)
…