推送自己的id。融合 kafka 连接 elasticsearch docker
Push own id. Confluent kafka connect elasticsearch docker
我正在使用 confluentinc/cp-kafka-connect docker 图片。
我正在尝试使用 elasticsearch id 将 JSON 文件发送到 kafka。
{"_id":10000725, "_source": {"createdByIdentity":"tu_adminn","createdBy":"Admin Testuser"}}
这是我的连接器
{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "andrii",
"key.ignore": "false",
"schema.ignore": "true",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"name": "elasticsearch-sink"
}
}
当我使用 key.ignore = true 时,它会生成一些奇怪的 ID。
我怎样才能准确传递我的 ID 和来源?
如果您指定 key.ignore=true
,那么 Kafka Connect 将使用您消息的 kafka 主题、分区和偏移量的复合键——这就是您要使用的 "weird id"看到。
如果创建的Elasticsearch文档要使用自己的ID,可以设置key.ignore=false
,Kafka Connect会使用Kafka消息的key 作为 ID。
如果您的 Kafka 消息没有适合您要执行的操作的密钥,您将需要设置它。一种选择是使用类似 KSQL 的东西:
CREATE STREAM target AS SELECT * FROM source PARTITION BY _id
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作
我正在使用 confluentinc/cp-kafka-connect docker 图片。 我正在尝试使用 elasticsearch id 将 JSON 文件发送到 kafka。
{"_id":10000725, "_source": {"createdByIdentity":"tu_adminn","createdBy":"Admin Testuser"}}
这是我的连接器
{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "andrii",
"key.ignore": "false",
"schema.ignore": "true",
"connection.url": "http://elasticsearch:9200",
"type.name": "test-type",
"name": "elasticsearch-sink"
}
}
当我使用 key.ignore = true 时,它会生成一些奇怪的 ID。 我怎样才能准确传递我的 ID 和来源?
如果您指定
key.ignore=true
,那么 Kafka Connect 将使用您消息的 kafka 主题、分区和偏移量的复合键——这就是您要使用的 "weird id"看到。如果创建的Elasticsearch文档要使用自己的ID,可以设置
key.ignore=false
,Kafka Connect会使用Kafka消息的key 作为 ID。
如果您的 Kafka 消息没有适合您要执行的操作的密钥,您将需要设置它。一种选择是使用类似 KSQL 的东西:
CREATE STREAM target AS SELECT * FROM source PARTITION BY _id
免责声明:我在开源 KSQL 项目背后的公司 Confluent 工作