如何将主题中 json 的 select 子值作为 ksql 流

How to select subvalue of json in topic as ksql stream

我在 kafka 中有很多主题,格式如下: value: {big json string with many subkeys etc}.

打印主题如下:

rowtime: 3/10/20 7:10:43 AM UTC, key: , value: {"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }

我试过使用

CREATE STREAM test
 (value STRUCT<
    server_name VARCHAR,
    remote_address VARCHAR,
    forwarded_for VARCHAR,
    remote_user VARCHAR,
    timestamp_start VARCHAR
 ..

WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');

但是我得到一个值为 NULL 的流。 有没有办法在value键下抓取?

转义的 JSON 无效 JSON,这可能会使这变得更加困难 :)

在此代码段中:

…\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\…

o_name 的前导双引号未转义。您可以使用 jq 之类的东西来验证这一点:

echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
parse error: Invalid numeric literal at line 1, column 685

在 JSON 修复后解析成功:

➜ echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_m
ethod\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_
wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddd
dddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
{
  "server_name": "xxxxxxxxxxxxxxx",
  "remote_address": "10.x.x.x",
  "user": "xxxxxx",
  "timestamp_start": "xxxxxxxx",
  "timestamp_finish": "xxxxxxxxxx",
  "time_start": "10/Mar/2020:07:10:39 +0000",
  "time_finish": "10/Mar/2020:07:10:39 +0000",
  "request_method": "PUT",
  "request_uri": "xxxxxxxxxxxxxxxxxxxxxxx",
  "protocol": "HTTP/1.1",
  "status": 200,
…

所以现在让我们把它放到 ksqlDB 中。我正在使用 kafkacat 将其加载到主题中:

kafkacat -b localhost:9092 -t testing -P<<EOF
{ "@timestamp": "XXXXXXXX", "beat": { "hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2.1" }, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
EOF

现在使用 ksqlDB 让我们声明大纲模式,其中 message 字段只是 VARCHAR 的一个块:

CREATE STREAM TEST (BEAT STRUCT<HOSTNAME VARCHAR, NAME VARCHAR, VERSION VARCHAR>,
                    INPUT_TYPE VARCHAR, 
                    MESSAGE VARCHAR, 
                    OFFSET BIGINT, 
                    SOURCE VARCHAR) 
            WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');

我们可以查询此流以检查它是否正常工作:

SET 'auto.offset.reset' = 'earliest';
SELECT BEAT->HOSTNAME, 
       BEAT->VERSION, 
       SOURCE, 
       MESSAGE 
  FROM TEST 
EMIT CHANGES LIMIT 1;
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|BEAT__HOSTNAME   |BEAT__VERSION  |SOURCE              |MESSAGE                                                             |
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|xxxxxxxxxx       |5.2.1          |/var/log/xxxx.log   |{"server_name":"xxxxxxxxxxxxxxx","remote_address":"10.x.x.x","user":|
|                 |               |                    |"xxxxxx","timestamp_start":"xxxxxxxx","timestamp_finish":"xxxxxxxxxx|
|                 |               |                    |","time_start":"10/Mar/2020:07:10:39 +0000","time_finish":"10/Mar/20|
|                 |               |                    |20:07:10:39 +0000","request_method":"PUT","request_uri":"xxxxxxxxxxx|
|                 |               |                    |xxxxxxxxxxxx","protocol":"HTTP/1.1","status":200,"response_length":"|
|                 |               |                    |0","request_length":"0","user_agent":"xxxxxxxxx","request_id":"zzzzz|
|                 |               |                    |zzzzzzzzzzzzzzzz","request_type":"zzzzzzzz","stat":{"c_wait":0.004,"|
|                 |               |                    |s_wait":0.432,"digest":0.0,"commit":31.878,"turn_around_time":0.0,"t|
|                 |               |                    |_transfer":32.319},"object_length":"0","o_name":"xxxxx","https":{"pr|
|                 |               |                    |otocol":"TLSv1.2","cipher_suite":"TLS_RSA_WITH_AES_256_GCM_SHA384"},|
|                 |               |                    |"principals":{"identity":"zzzzzz","asv":"dddddddddd"},"type":"http",|
|                 |               |                    |"format":1}                                                         |
Limit Reached
Query terminated

现在让我们使用 EXTRACTJSONFIELD 函数提取嵌入的 JSON 字段(我没有完成每个字段,只是其中的一小部分来说明要遵循的模式):

SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address')        AS REMOTE_ADDRESS,
       EXTRACTJSONFIELD(MESSAGE,'$.time_start')            AS TIME_START,
       EXTRACTJSONFIELD(MESSAGE,'$.protocol')              AS PROTOCOL,
       EXTRACTJSONFIELD(MESSAGE,'$.status')                AS STATUS,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait')           AS STAT_C_WAIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait')           AS STAT_S_WAIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.digest')           AS STAT_DIGEST,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.commit')           AS STAT_COMMIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer')       AS STAT_T_TRANSFER 
  FROM TEST 
EMIT CHANGES LIMIT 1;
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|REMOTE_ADDRESS  |TIME_START                |PROTOCOL  |STATUS  |STAT_C_WAIT |STAT_S_WAIT  |STAT_DIGEST |STAT_COMMIT |STAT_TURN_AROUND_TIME |STAT_T_TRANSFER |
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|10.x.x.x        |10/Mar/2020:07:10:39 +0000|HTTP/1.1  |200     |0.004       |0.432        |0           |31.878      |0                     |32.319          |

我们可以将它持久化到一个新的 Kafka 主题中,并且为了更好的测量将它重新序列化到 Avro 以使下游应用程序更容易使用:

CREATE STREAM BEATS WITH (VALUE_FORMAT='AVRO') AS
    SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address')        AS REMOTE_ADDRESS,
        EXTRACTJSONFIELD(MESSAGE,'$.time_start')            AS TIME_START,
        EXTRACTJSONFIELD(MESSAGE,'$.protocol')              AS PROTOCOL,
        EXTRACTJSONFIELD(MESSAGE,'$.status')                AS STATUS,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait')           AS STAT_C_WAIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait')           AS STAT_S_WAIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.digest')           AS STAT_DIGEST,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.commit')           AS STAT_COMMIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer')       AS STAT_T_TRANSFER 
    FROM TEST 
    EMIT CHANGES LIMIT 1;
ksql> DESCRIBE BEATS;

Name                 : BEATS
 Field                 | Type
---------------------------------------------------
 ROWTIME               | BIGINT           (system)
 ROWKEY                | VARCHAR(STRING)  (system)
 REMOTE_ADDRESS        | VARCHAR(STRING)
 TIME_START            | VARCHAR(STRING)
 PROTOCOL              | VARCHAR(STRING)
 STATUS                | VARCHAR(STRING)
 STAT_C_WAIT           | VARCHAR(STRING)
 STAT_S_WAIT           | VARCHAR(STRING)
 STAT_DIGEST           | VARCHAR(STRING)
 STAT_COMMIT           | VARCHAR(STRING)
 STAT_TURN_AROUND_TIME | VARCHAR(STRING)
 STAT_T_TRANSFER       | VARCHAR(STRING)
---------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

要调试 ksqlDB 返回 NULL 的问题,请查看 this article。很多时候它归结为序列化错误。例如,如果您查看 ksqlDB 服务器日志,当它在我修复它之前尝试解析格式错误的转义 JSON 时,您会看到此错误:

WARN Exception caught during Deserialization, taskId: 0_0, topic: testing, partition: 0, offset: 1 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: mvn value from topic: testing
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('o' (code 111)): was expecting comma to separate Object entries
 at [Source: (byte[])"{"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\
"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HT"[truncated 604 bytes];
 line: 1, column: 827]
   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:986)
…