Kafka 连接 ElasticSearch sink - 使用 if-else 块提取和转换不同主题的字段

Kafka connect ElasticSearch sink - using if-else blocks to extract and transform fields for different topics

我有一个如下所示的 kafka es 接收器属性文件

name=elasticsearch.sink.direct
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=16
topics=data.my_setting

connection.url=http://dev-elastic-search01:9200
type.name=logs
topic.index.map=data.my_setting:direct_my_setting_index
batch.size=2048
max.buffered.records=32768
flush.timeout.ms=60000
max.retries=10
retry.backoff.ms=1000
schema.ignore=true
transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=MY_SETTING_ID
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=MY_SETTING_ID

这非常适合单个主题 (data.my_setting)。我想对来自多个主题的数据使用相同的连接器。不同主题中的消息将有不同的键,我需要它 transform.I 想知道是否有办法使用 if else 语句,条件是主题名称或消息中的单个字段,这样然后我可以以不同的方式转换密钥。所有传入的消息都 json 具有架构和负载。

根据答案更新

在我的 jdbc 连接器中,我添加密钥如下:

name=data.my_setting
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
poll.interval.ms=500
tasks.max=4
mode=timestamp
query=SELECT * FROM MY_TABLE with (nolock)
timestamp.column.name=LAST_MOD_DATE
topic.prefix=investment.ed.data.app_setting

transforms=ValueToKey
transforms.ValueToKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.ValueToKey.fields=MY_SETTING_ID

但是,当 elasticsearch sink 读取从此连接器生成的消息时,我仍然收到错误消息

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
Caused by: org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id

有效负载如下所示:

{
"schema": {
    "type": "struct",
    "fields": [{
            "type": "int32",
            "optional": false,
            "field": "MY_SETTING_ID"
        }, {
            "type": "string",
            "optional": true,
            "field": "MY_SETTING_NAME"
        }
    ],
    "optional": false
},
"payload": {
    "MY_SETTING_ID": 9,
    "MY_SETTING_NAME": "setting_name"
}
}

独立连接 属性 文件如下所示:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter 
value.converter=org.apache.kafka.connect.json.JsonConverter 
converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter 
internal.value.converter=org.apache.kafka.connect.json.JsonConverter 
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/apps/{env}/logs/infrastructure/offsets/connect.offsets
rest.port=8084
plugin.path=/usr/share/java

有没有一种方法可以实现我的目标,即发送来自多个主题(在我的例子中是数据库表)的消息,这些主题将具有自己唯一的 ID(也将是 ES 中文档的 ID)到单个 ES 接收器。

我可以使用 avro 来完成这个任务吗?有没有办法在模式注册表中定义密钥,或者我会 运行 遇到同样的问题?

这是不可能的。如果关键字段不同,您将需要多个连接器。

要考虑的一个选项是通过流处理器(例如 Kafka Streams、KSQL、Spark Streaming 等)预处理 Kafka 主题以标准化关键字段,这样您就可以使用单个连接器。这取决于您正在构建的内容是否值得这样做,或者矫枉过正。