使用kafka sink在elasticsearch中重命名索引
rename index in elasticsearch with kafka sink
我正在使用以下接收器。问题是它将弹性搜索索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用 confluent 4
{
"name": "es-sink-mysql-foobar-02",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist ",
"type.name": "type.name=kafka-connect",
"index.name": "asimtest",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "mysql-foobar",
"_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source) you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
"key.ignore": "true"
}
}
为此使用 Kafka Connect 的单消息转换 (SMT) 功能。
例如,要删除 mysql-
前缀:
"_comment": "Drop the mysql- prefix from the topic name and thus Elasticsearch index name",
"transforms": "dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"mysql-(.*)",
"transforms.dropPrefix.replacement":""
或者删除前缀并将消息路由到基于时间的 Elasticsearch 索引:
"transforms":"dropPrefix,routeTS",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"mysql-(.*)",
"transforms.dropPrefix.replacement":"",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"kafka-${topic}-${timestamp}",
"transforms.routeTS.timestamp.format":"YYYYMM"
有关详细信息,请参阅 https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/。
我正在使用以下接收器。问题是它将弹性搜索索引名称设置为与主题相同。我想要一个不同的 elasticseach 索引名称。我怎样才能做到这一点。我正在使用 confluent 4
{
"name": "es-sink-mysql-foobar-02",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",
"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist ",
"type.name": "type.name=kafka-connect",
"index.name": "asimtest",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "mysql-foobar",
"_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source) you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
"key.ignore": "true"
}
}
为此使用 Kafka Connect 的单消息转换 (SMT) 功能。
例如,要删除 mysql-
前缀:
"_comment": "Drop the mysql- prefix from the topic name and thus Elasticsearch index name",
"transforms": "dropPrefix",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"mysql-(.*)",
"transforms.dropPrefix.replacement":""
或者删除前缀并将消息路由到基于时间的 Elasticsearch 索引:
"transforms":"dropPrefix,routeTS",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"mysql-(.*)",
"transforms.dropPrefix.replacement":"",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"kafka-${topic}-${timestamp}",
"transforms.routeTS.timestamp.format":"YYYYMM"
有关详细信息,请参阅 https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/。