Kafka Sink Connector 弹性云

Kafka Sink Connector Elastic Cloud

我正在尝试通过 url、用户名和密码将 kafka 连接到 Elastic cloud。 我收到 404 错误,所以我猜配置不适合连接到 Elastic 集群。 如何连接到 Elastic Cloud?

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=_doc
connection.password=my_pass_elastic
topics=demo-topic-distributed
tasks.max=1
connection.username=my_user_elastic
connection.url=https://xxxxxxxxxxxxxxxxx.xx-xxx-x.aws.found.io:port
value.converter=org.apache.kafka.connect.json.JsonConverter
key.ignore=true
key.converter=org.apache.kafka.connect.storage.StringConverter
schema.ignore=true


org.apache.kafka.connect.errors.ConnectException: Could not create index 'demo-topic-distributed': 404 Not Found
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndex(JestElasticsearchClient.java:458)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:425)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:374)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:131)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:617)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:71)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:682)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:293)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:430)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:449)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:454)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

我在 connection.usernameconnection.password 字段中使用的凭据与我用来从集群访问 kibana。

指定 URL 和您的 Elastic 主机的凭据,不是 Kibana。

从 Elastic cloud 你应该有一个 Copy endpoint 按钮