kafka-connect-elasticsearch: 如何根据Kafka主题header中的某个值删除文档

kafka-connect-elasticsearch: How to delete document based on certain value in header of the Kafka topic

我正在使用 Kafka Connect 加快速度。尝试使用 Kafka Connect Elasticsearch Service 接收器连接器将我们的数据从 Kafka 移动到 Elasticsearch。 我有一个看起来像这样的处理流:

来自s3的文件记录->来自发布到->kafka主题->Kafka连接->Elastic Search的源应用程序的自定义处理

这适用于 create/update 的场景。但是我们想要处理文件的删除场景。我们的应用程序为删除操作发布一个事件,并将其设置为 Kafka 消息中 header 值的一部分。我们不想使用此删除操作信息更新弹性文档,而是想删除文档本身。

我们如何使用 Kafka Connect 读取此 header 值并从 Elastic 中删除给定键的文档?

提前感谢您的帮助。

此致, 维卡斯

已编辑: 我尝试转换的消息示例:

[{
    "key": "fileid=05ffefea-a71d-4bb7-091e-08d8f9229806",
    "rownum": 0,
    "metadata": {
        "offset": 1468950,
        "partition": 3,
        "timestamp": 1617773161088,
        "__keysize": 43,
        "__valsize": 596
    },
    "headers": {
        "sub-tenant-id": "",
        "actiondate": "2021-04-07T05:26:01.0790010Z",
        "action": "uploaded",
        "contentversion": "V1"
    },
    "value": {
        "id": "fil.05ffefeaa71d4bb7091e08d8f9229806",
        "name": "4.txt",
        "volumeId": "vol.e25196dc9e2f460bb27308d8f8405691",
        "volumeName": "projdmck0405",
        "type": "text/plain",
        "subTenantId": "",
        "path": "/4.txt",
        "timeCreated": "2021-04-07T05:25:46.129Z",
        "timeModified": "2021-04-07T05:25:46.129Z",
        "urn": "urn:mycompany:product:test:app:file:fil.05ffefeaa71d4bb7091e08d8f9229806#/4.txt",
        "sizeInBytes": 76,
        "isUploaded": true,
        "archiveStatus": "None",
        "storageTier": "Standard",
        "eTag": "11fb9ec5531d90d571b331cc39e43175"
    }
}]

我正在尝试将 action header fieldvalue 添加到消息的 body 中。

这是我在示例中使用的转换: https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-transform-common/transformations/examples/HeaderToField.headertofield.html

 "transforms"                            : "dropNullRecords,headerToField",

 "transforms.headerToField.type"             : "com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value",
      "transforms.headerToField.header.mappings"  : "action:STRING:actioninbody"

我确实按照示例使用“action:STRING”的映射值进行了尝试,然后我注意到提到的格式为:

The format is <header name>:<header type>[:field name]. 

我错过了什么?

我做到了。 最终编写了自定义 SMT。使用 SMT,我可以访问连接记录,包括 header 和值。所以我只是一个一个地读取 header 值,当遇到我感兴趣的值时,我将连接记录的值设置为空。除此之外,Kafka Connect 还公开了以下参数:

behavior.on.null.values
How to handle records with a non-null key and a null value (for example, Kafka tombstone records). Valid options are IGNORE, DELETE, and FAIL.

Type: string
Default: FAIL
Valid Values: (case insensitive) [DELETE, IGNORE, FAIL]
Importance: low

我将值设置为 DELETE,它开始从 ES 索引中删除记录。

我遵循了 Confluent 中的自定义 SMT 示例: https://github.com/confluentinc/kafka-connect-insert-uuid

对理解概念和连接记录 class 结构本身有很大帮助。

希望这对其他人有帮助。