部署 ElasticsearchSinkConnector 后任务为空
Tasks are empty after deploying ElasticsearchSinkConnector
我尝试通过
部署ElasticsearchSinkConnector
POST/连接器
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS"
}
}
它已成功部署,但是,当我通过
检查状态时
获取/connectors/elasticsearch-sink/status
tasks
为空数组 []
:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [],
"type": "sink"
}
我找到了这个Kafka Connect: No tasks created for a connector
但是,我在内部尝试了这两个答案,更改名称和删除 ElasticsearchSinkConnector 然后重新部署多次对我来说都不起作用。
此外,Kafka Connect pod 中没有日志。
有什么想法吗?谢谢!
将这两个添加到 ElasticsearchSinkConnector 配置后
"errors.log.include.messages": "true",
"errors.log.enable": "true"
在配置中这样
POST/连接器
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
这次我通过
查看状态
获取/connectors/elasticsearch-sink/status
tasks
现在显示错误:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.xxx.xxx.xxx:8083",
"trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
}
],
"type": "sink"
}
现在只需要解决权限问题。
此外,我找到了一篇关于如何调试的好文章:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
我尝试通过
部署ElasticsearchSinkConnectorPOST/连接器
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS"
}
}
它已成功部署,但是,当我通过
检查状态时获取/connectors/elasticsearch-sink/status
tasks
为空数组 []
:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [],
"type": "sink"
}
我找到了这个Kafka Connect: No tasks created for a connector
但是,我在内部尝试了这两个答案,更改名称和删除 ElasticsearchSinkConnector 然后重新部署多次对我来说都不起作用。
此外,Kafka Connect pod 中没有日志。
有什么想法吗?谢谢!
将这两个添加到 ElasticsearchSinkConnector 配置后
"errors.log.include.messages": "true",
"errors.log.enable": "true"
在配置中这样
POST/连接器
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"tasks.max": "1",
"topics": "my_db_server.public.my_table",
"connection.url": "https://my-elasticsearch.com:9200",
"connection.username": "xxx",
"connection.password": "xxx",
"key.ignore": "true",
"schema.ignore": "true",
"elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "path/to/keystore.jks",
"elastic.https.ssl.keystore.password": "xxx",
"elastic.https.ssl.key.password": "xxx",
"elastic.https.ssl.keystore.type": "JKS",
"elastic.https.ssl.truststore.location": "path/to/truststore.jks",
"elastic.https.ssl.truststore.password": "xxx",
"elastic.https.ssl.truststore.type": "JKS",
"elastic.https.ssl.protocol": "TLS",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
这次我通过
查看状态获取/connectors/elasticsearch-sink/status
tasks
现在显示错误:
{
"name": "elasticsearch-sink",
"connector": {
"state": "RUNNING",
"worker_id": "10.xxx.xxx.xxx:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "10.xxx.xxx.xxx:8083",
"trace": "org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: connect-elasticsearch-sink\n"
}
],
"type": "sink"
}
现在只需要解决权限问题。
此外,我找到了一篇关于如何调试的好文章:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/