Elasticsearch Kafka Connector——根据消息值设置索引
Elasticsearch Kafka Connector - setting index based on message value
我有以下格式的来自 Kafka 主题的消息:
{"elasticsearch_index": "index_1", "first_name": "Jane"}
{"elasticsearch_index": "index_2", "first_name": "John"}
请注意,每条消息都包含我们希望将记录路由到的所需 Elasticsearch 索引。是否可以使用 Confluent 的 Elasticsearch Kafka 连接器将这些记录路由到适当的索引名称(例如 elasticsearch_index
键下列出的任何内容)?
目前单消息转换 (SMT) 似乎不支持此行为,但也许我误读了。任何信息将不胜感激。
两个选项:
- 使用 Single Message Transform API
编写您自己的转换
- 首先使用 KSQL(或 Kafka Streams)将消息路由到所需的主题,然后使用新的 (Apache Kafka 1.1) 正则表达式功能将这些主题从 Kafka Connect 发送到 Elasticsearch。
我有以下格式的来自 Kafka 主题的消息:
{"elasticsearch_index": "index_1", "first_name": "Jane"}
{"elasticsearch_index": "index_2", "first_name": "John"}
请注意,每条消息都包含我们希望将记录路由到的所需 Elasticsearch 索引。是否可以使用 Confluent 的 Elasticsearch Kafka 连接器将这些记录路由到适当的索引名称(例如 elasticsearch_index
键下列出的任何内容)?
目前单消息转换 (SMT) 似乎不支持此行为,但也许我误读了。任何信息将不胜感激。
两个选项:
- 使用 Single Message Transform API 编写您自己的转换
- 首先使用 KSQL(或 Kafka Streams)将消息路由到所需的主题,然后使用新的 (Apache Kafka 1.1) 正则表达式功能将这些主题从 Kafka Connect 发送到 Elasticsearch。