推送自己的id。融合 kafka 连接 elasticsearch docker

Push own id. Confluent kafka connect elasticsearch docker

我正在使用 confluentinc/cp-kafka-connect docker 图片。 我正在尝试使用 elasticsearch id 将 JSON 文件发送到 kafka。

{"_id":10000725, "_source": {"createdByIdentity":"tu_adminn","createdBy":"Admin Testuser"}}

这是我的连接器

{
  "name": "test-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "andrii",
    "key.ignore": "false",
    "schema.ignore": "true",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "test-type",
    "name": "elasticsearch-sink"
  }
}

当我使用 key.ignore = true 时,它​​会生成一些奇怪的 ID。 我怎样才能准确传递我的 ID 和来源?

Per the docs:

  • 如果您指定 key.ignore=true,那么 Kafka Connect 将使用您消息的 kafka 主题、分区和偏移量的复合键——这就是您要使用的 "weird id"看到。

  • 如果创建的Elasticsearch文档要使用自己的ID,可以设置key.ignore=false,Kafka Connect会使用Kafka消息的key 作为 ID。

如果您的 Kafka 消息没有适合您要执行的操作的密钥,您将需要设置它。一种选择是使用类似 KSQL 的东西:

CREATE STREAM target AS SELECT * FROM source PARTITION BY _id

免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作