kafka-connect-elasticsearch: 如何根据Kafka主题header中的某个值删除文档
kafka-connect-elasticsearch: How to delete document based on certain value in header of the Kafka topic
我正在使用 Kafka Connect 加快速度。尝试使用 Kafka Connect Elasticsearch Service 接收器连接器将我们的数据从 Kafka 移动到 Elasticsearch。
我有一个看起来像这样的处理流:
来自s3的文件记录->来自发布到->kafka主题->Kafka连接->Elastic Search的源应用程序的自定义处理
这适用于 create/update 的场景。但是我们想要处理文件的删除场景。我们的应用程序为删除操作发布一个事件,并将其设置为 Kafka 消息中 header 值的一部分。我们不想使用此删除操作信息更新弹性文档,而是想删除文档本身。
我们如何使用 Kafka Connect 读取此 header 值并从 Elastic 中删除给定键的文档?
提前感谢您的帮助。
此致,
维卡斯
已编辑:
我尝试转换的消息示例:
[{
"key": "fileid=05ffefea-a71d-4bb7-091e-08d8f9229806",
"rownum": 0,
"metadata": {
"offset": 1468950,
"partition": 3,
"timestamp": 1617773161088,
"__keysize": 43,
"__valsize": 596
},
"headers": {
"sub-tenant-id": "",
"actiondate": "2021-04-07T05:26:01.0790010Z",
"action": "uploaded",
"contentversion": "V1"
},
"value": {
"id": "fil.05ffefeaa71d4bb7091e08d8f9229806",
"name": "4.txt",
"volumeId": "vol.e25196dc9e2f460bb27308d8f8405691",
"volumeName": "projdmck0405",
"type": "text/plain",
"subTenantId": "",
"path": "/4.txt",
"timeCreated": "2021-04-07T05:25:46.129Z",
"timeModified": "2021-04-07T05:25:46.129Z",
"urn": "urn:mycompany:product:test:app:file:fil.05ffefeaa71d4bb7091e08d8f9229806#/4.txt",
"sizeInBytes": 76,
"isUploaded": true,
"archiveStatus": "None",
"storageTier": "Standard",
"eTag": "11fb9ec5531d90d571b331cc39e43175"
}
}]
我正在尝试将 action
header field
和 value
添加到消息的 body 中。
"transforms" : "dropNullRecords,headerToField",
"transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
"transforms.headerToField.header.mappings" : "action:STRING:actioninbody"
我确实按照示例使用“action:STRING”的映射值进行了尝试,然后我注意到提到的格式为:
The format is <header name>:<header type>[:field name].
我错过了什么?
我做到了。
最终编写了自定义 SMT。使用 SMT,我可以访问连接记录,包括 header 和值。所以我只是一个一个地读取 header 值,当遇到我感兴趣的值时,我将连接记录的值设置为空。除此之外,Kafka Connect 还公开了以下参数:
behavior.on.null.values
How to handle records with a non-null key and a null value (for example, Kafka tombstone records). Valid options are IGNORE, DELETE, and FAIL.
Type: string
Default: FAIL
Valid Values: (case insensitive) [DELETE, IGNORE, FAIL]
Importance: low
我将值设置为 DELETE,它开始从 ES 索引中删除记录。
我遵循了 Confluent 中的自定义 SMT 示例:
https://github.com/confluentinc/kafka-connect-insert-uuid
对理解概念和连接记录 class 结构本身有很大帮助。
希望这对其他人有帮助。
我正在使用 Kafka Connect 加快速度。尝试使用 Kafka Connect Elasticsearch Service 接收器连接器将我们的数据从 Kafka 移动到 Elasticsearch。 我有一个看起来像这样的处理流:
来自s3的文件记录->来自发布到->kafka主题->Kafka连接->Elastic Search的源应用程序的自定义处理
这适用于 create/update 的场景。但是我们想要处理文件的删除场景。我们的应用程序为删除操作发布一个事件,并将其设置为 Kafka 消息中 header 值的一部分。我们不想使用此删除操作信息更新弹性文档,而是想删除文档本身。
我们如何使用 Kafka Connect 读取此 header 值并从 Elastic 中删除给定键的文档?
提前感谢您的帮助。
此致, 维卡斯
已编辑: 我尝试转换的消息示例:
[{
"key": "fileid=05ffefea-a71d-4bb7-091e-08d8f9229806",
"rownum": 0,
"metadata": {
"offset": 1468950,
"partition": 3,
"timestamp": 1617773161088,
"__keysize": 43,
"__valsize": 596
},
"headers": {
"sub-tenant-id": "",
"actiondate": "2021-04-07T05:26:01.0790010Z",
"action": "uploaded",
"contentversion": "V1"
},
"value": {
"id": "fil.05ffefeaa71d4bb7091e08d8f9229806",
"name": "4.txt",
"volumeId": "vol.e25196dc9e2f460bb27308d8f8405691",
"volumeName": "projdmck0405",
"type": "text/plain",
"subTenantId": "",
"path": "/4.txt",
"timeCreated": "2021-04-07T05:25:46.129Z",
"timeModified": "2021-04-07T05:25:46.129Z",
"urn": "urn:mycompany:product:test:app:file:fil.05ffefeaa71d4bb7091e08d8f9229806#/4.txt",
"sizeInBytes": 76,
"isUploaded": true,
"archiveStatus": "None",
"storageTier": "Standard",
"eTag": "11fb9ec5531d90d571b331cc39e43175"
}
}]
我正在尝试将 action
header field
和 value
添加到消息的 body 中。
"transforms" : "dropNullRecords,headerToField",
"transforms.headerToField.type" : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
"transforms.headerToField.header.mappings" : "action:STRING:actioninbody"
我确实按照示例使用“action:STRING”的映射值进行了尝试,然后我注意到提到的格式为:
The format is <header name>:<header type>[:field name].
我错过了什么?
我做到了。 最终编写了自定义 SMT。使用 SMT,我可以访问连接记录,包括 header 和值。所以我只是一个一个地读取 header 值,当遇到我感兴趣的值时,我将连接记录的值设置为空。除此之外,Kafka Connect 还公开了以下参数:
behavior.on.null.values
How to handle records with a non-null key and a null value (for example, Kafka tombstone records). Valid options are IGNORE, DELETE, and FAIL.
Type: string
Default: FAIL
Valid Values: (case insensitive) [DELETE, IGNORE, FAIL]
Importance: low
我将值设置为 DELETE,它开始从 ES 索引中删除记录。
我遵循了 Confluent 中的自定义 SMT 示例: https://github.com/confluentinc/kafka-connect-insert-uuid
对理解概念和连接记录 class 结构本身有很大帮助。
希望这对其他人有帮助。