Elasticsearch Kafka Connector——根据消息值设置索引

Elasticsearch Kafka Connector - setting index based on message value

我有以下格式的来自 Kafka 主题的消息:

{"elasticsearch_index": "index_1", "first_name": "Jane"}
{"elasticsearch_index": "index_2", "first_name": "John"}

请注意,每条消息都包含我们希望将记录路由到的所需 Elasticsearch 索引。是否可以使用 Confluent 的 Elasticsearch Kafka 连接器将这些记录路由到适当的索引名称(例如 elasticsearch_index 键下列出的任何内容)?

目前单消息转换 (SMT) 似乎不支持此行为,但也许我误读了。任何信息将不胜感激。

两个选项:

  1. 使用 Single Message Transform API
  2. 编写您自己的转换
  3. 首先使用 KSQL(或 Kafka Streams)将消息路由到所需的主题,然后使用新的 (Apache Kafka 1.1) 正则表达式功能将这些主题从 Kafka Connect 发送到 Elasticsearch。