需要根据某个关键字过滤掉Kafka Records
Need to filter out Kafka Records based on a certain keyword
我有一个 Kafka 主题,它有大约 300 万条记录。我想从中挑选出一条具有特定参数的记录。我一直在尝试使用 Lenses 对此进行查询,但无法形成正确的查询。以下是1条消息的记录内容
{
"header": {
"schemaVersionNo": "1",
},
"payload": {
"modifiedDate": 1552334325212,
"createdDate": 1552334325212,
"createdBy": "A",
"successful": true,
"source_order_id": "1111111111111",
}
}
现在我想过滤掉具有特定 source_order_id 的记录,但无法找出正确的方法。
我们也尝试过通过镜头以及 Kafka 工具。
我们在镜头中尝试的示例查询如下:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='A'
此查询有效,但是如果我们尝试使用如下所示的源 ID,我们会收到错误消息:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='1111111111111'
Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
通过自定义消费者消费所有 300 万条记录,然后对其进行迭代对我来说似乎不是一种优化方法,因此为此类用例寻找任何可用的解决方案。
既然你说你对其他解决方案持开放态度,这里有一个使用 KSQL 构建的解决方案。
首先,让我们将一些示例记录放入源主题中:
$ kafkacat -P -b localhost:9092 -t TEST <<EOF
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325212, "createdDate": 1552334325212, "createdBy": "A", "successful": true, "source_order_id": "3411976933214" } }
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215" } }
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325612, "createdDate": 1552334325612, "createdBy": "C", "successful": true, "source_order_id": "3411976933216" } }
EOF
使用 KSQL 我们可以检查主题 PRINT
:
ksql> PRINT 'TEST' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325212,"createdDate":1552334325212,"createdBy":"A","successful":true,"source_order_id":"3411976933214"}}
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325412,"createdDate":1552334325412,"createdBy":"B","successful":true,"source_order_id":"3411976933215"}}
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325612,"createdDate":1552334325612,"createdBy":"C","successful":true,"source_order_id":"3411976933216"}}
然后声明关于该主题的架构,这使我们能够 运行 SQL 反对它:
ksql> CREATE STREAM TEST (header STRUCT<schemaVersionNo VARCHAR>,
payload STRUCT<modifiedDate BIGINT,
createdDate BIGINT,
createdBy VARCHAR,
successful BOOLEAN,
source_order_id VARCHAR>)
WITH (KAFKA_TOPIC='TEST',
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
告诉 KSQL 使用主题中的所有数据:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
现在我们可以select所有数据:
ksql> SELECT * FROM TEST;
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325412, CREATEDDATE=1552334325412, CREATEDBY=B, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933215}
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
^CQuery terminated
或者我们可以 select 主动查询它,使用 ->
表示法访问架构中的嵌套字段:
ksql> SELECT * FROM TEST
WHERE PAYLOAD->CREATEDBY='A';
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
除了 select 所有记录,您还可以 return 只关注感兴趣的字段:
ksql> SELECT payload FROM TEST
WHERE PAYLOAD->source_order_id='3411976933216';
{MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
使用 KSQL 您可以将任何 SELECT
语句的结果写入一个新主题,该主题将使用所有现有消息以及源主题上的每条新消息填充它声明的 SELECT
语句:
ksql> CREATE STREAM TEST_CREATED_BY_A AS
SELECT * FROM TEST WHERE PAYLOAD->CREATEDBY='A';
Message
----------------------------
Stream created and running
----------------------------
在 Kafka 集群上列出主题:
ksql> SHOW TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
orders | true | 1 | 1 | 1 | 1
pageviews | false | 1 | 1 | 0 | 0
products | true | 1 | 1 | 1 | 1
TEST | true | 1 | 1 | 1 | 1
TEST_CREATED_BY_A | true | 4 | 1 | 0 | 0
打印新话题的内容:
ksql> PRINT 'TEST_CREATED_BY_A' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552475910106,"ROWKEY":"null","HEADER":{"SCHEMAVERSIONNO":"1"},"PAYLOAD":{"MODIFIEDDATE":1552334325212,"CREATEDDATE":1552334325212,"CREATEDBY":"A","SUCCESSFUL":true,"SOURCE_ORDER_ID":"3411976933214"}}
我有一个 Kafka 主题,它有大约 300 万条记录。我想从中挑选出一条具有特定参数的记录。我一直在尝试使用 Lenses 对此进行查询,但无法形成正确的查询。以下是1条消息的记录内容
{
"header": {
"schemaVersionNo": "1",
},
"payload": {
"modifiedDate": 1552334325212,
"createdDate": 1552334325212,
"createdBy": "A",
"successful": true,
"source_order_id": "1111111111111",
}
}
现在我想过滤掉具有特定 source_order_id 的记录,但无法找出正确的方法。 我们也尝试过通过镜头以及 Kafka 工具。
我们在镜头中尝试的示例查询如下:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='A'
此查询有效,但是如果我们尝试使用如下所示的源 ID,我们会收到错误消息:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='1111111111111'
Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
通过自定义消费者消费所有 300 万条记录,然后对其进行迭代对我来说似乎不是一种优化方法,因此为此类用例寻找任何可用的解决方案。
既然你说你对其他解决方案持开放态度,这里有一个使用 KSQL 构建的解决方案。
首先,让我们将一些示例记录放入源主题中:
$ kafkacat -P -b localhost:9092 -t TEST <<EOF
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325212, "createdDate": 1552334325212, "createdBy": "A", "successful": true, "source_order_id": "3411976933214" } }
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325412, "createdDate": 1552334325412, "createdBy": "B", "successful": true, "source_order_id": "3411976933215" } }
{ "header": { "schemaVersionNo": "1" }, "payload": { "modifiedDate": 1552334325612, "createdDate": 1552334325612, "createdBy": "C", "successful": true, "source_order_id": "3411976933216" } }
EOF
使用 KSQL 我们可以检查主题 PRINT
:
ksql> PRINT 'TEST' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325212,"createdDate":1552334325212,"createdBy":"A","successful":true,"source_order_id":"3411976933214"}}
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325412,"createdDate":1552334325412,"createdBy":"B","successful":true,"source_order_id":"3411976933215"}}
{"ROWTIME":1552476232988,"ROWKEY":"null","header":{"schemaVersionNo":"1"},"payload":{"modifiedDate":1552334325612,"createdDate":1552334325612,"createdBy":"C","successful":true,"source_order_id":"3411976933216"}}
然后声明关于该主题的架构,这使我们能够 运行 SQL 反对它:
ksql> CREATE STREAM TEST (header STRUCT<schemaVersionNo VARCHAR>,
payload STRUCT<modifiedDate BIGINT,
createdDate BIGINT,
createdBy VARCHAR,
successful BOOLEAN,
source_order_id VARCHAR>)
WITH (KAFKA_TOPIC='TEST',
VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
告诉 KSQL 使用主题中的所有数据:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
现在我们可以select所有数据:
ksql> SELECT * FROM TEST;
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325412, CREATEDDATE=1552334325412, CREATEDBY=B, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933215}
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
^CQuery terminated
或者我们可以 select 主动查询它,使用 ->
表示法访问架构中的嵌套字段:
ksql> SELECT * FROM TEST
WHERE PAYLOAD->CREATEDBY='A';
1552475910106 | null | {SCHEMAVERSIONNO=1} | {MODIFIEDDATE=1552334325212, CREATEDDATE=1552334325212, CREATEDBY=A, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933214}
除了 select 所有记录,您还可以 return 只关注感兴趣的字段:
ksql> SELECT payload FROM TEST
WHERE PAYLOAD->source_order_id='3411976933216';
{MODIFIEDDATE=1552334325612, CREATEDDATE=1552334325612, CREATEDBY=C, SUCCESSFUL=true, SOURCE_ORDER_ID=3411976933216}
使用 KSQL 您可以将任何 SELECT
语句的结果写入一个新主题,该主题将使用所有现有消息以及源主题上的每条新消息填充它声明的 SELECT
语句:
ksql> CREATE STREAM TEST_CREATED_BY_A AS
SELECT * FROM TEST WHERE PAYLOAD->CREATEDBY='A';
Message
----------------------------
Stream created and running
----------------------------
在 Kafka 集群上列出主题:
ksql> SHOW TOPICS;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------------------------
orders | true | 1 | 1 | 1 | 1
pageviews | false | 1 | 1 | 0 | 0
products | true | 1 | 1 | 1 | 1
TEST | true | 1 | 1 | 1 | 1
TEST_CREATED_BY_A | true | 4 | 1 | 0 | 0
打印新话题的内容:
ksql> PRINT 'TEST_CREATED_BY_A' FROM BEGINNING;
Format:JSON
{"ROWTIME":1552475910106,"ROWKEY":"null","HEADER":{"SCHEMAVERSIONNO":"1"},"PAYLOAD":{"MODIFIEDDATE":1552334325212,"CREATEDDATE":1552334325212,"CREATEDBY":"A","SUCCESSFUL":true,"SOURCE_ORDER_ID":"3411976933214"}}