Kafka 连接任务随机死于 NPE
Kafka connect tasks die with NPE randomly
我启动了 运行 一个自定义的 kafka connect 插件,它将数据从 kafka 转储到 ElasticSearch,并且我的任务会定期无缘无故地死掉(几小时到一天后)
我在连接状态中看到了这个:
curl -s http://localhost:8083/connectors/my-custom-sink/status | python -m json.tool
{
"connector": {
"state": "RUNNING",
"worker_id": "MASKED_IP1:8083"
},
"name": "my-custom-sink",
"tasks": [
{
"id": 0,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
"worker_id": "MASKED_IP1:8083"
},
{
"id": 1,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
"worker_id": "MASKED_IP2:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "MASKED_IP3:8083"
}
]
}
任何见解将不胜感激(即 kafka 在抛出 npe 时试图做什么)
更新:
这是卡夫卡 0.10.2.0
这是我在工人死前在日志中看到的内容
INFO 2017-07-30 07:38:17,146 [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.10.2.0
INFO 2017-07-30 07:38:17,146 [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 576d93a8dc0cf421
INFO 2017-07-30 07:38:17,172 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN:9092 (id: 2147483643 rack: null) for group my-group.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,224 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,224 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,226 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Finished reading KafkaBasedLog for topic connect-configs
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Started KafkaBasedLog for topic connect-configs
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Started KafkaConfigBackingStore
INFO 2017-07-30 07:38:17,233 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder started
INFO 2017-07-30 07:38:17,239 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN1:9092 (id: 2147483643 rack: null) for group my-group.
INFO 2017-07-30 07:38:17,241 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: (Re-)joining group my-group
INFO 2017-07-30 07:38:20,868 [CLASSPATH traversal thread.] org.reflections.Reflections: Reflections took 3600 ms to scan 1 urls, producing 7612 keys and 36389 values
INFO 2017-07-30 07:38:22,511 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Successfully joined group my-group with generation 804011
INFO 2017-07-30 07:38:22,516 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Joined group and got assignment: Assignment{error=0, leader='connect-1-29930623-f17a-43ca-bed9-7f8a6fb5317a', leaderUrl='http://MASKED_IP:8083/', offset=2, connectorIds=[], taskIds=[my-group]}
WARN 2017-07-30 07:38:22,521 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Catching up to assignment's config offset.
INFO 2017-07-30 07:38:22,521 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Current config state offset -1 is behind group assignment 2, reading to end of config log
INFO 2017-07-30 07:38:22,846 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished reading to end of log and updated config snapshot, new config log offset: 2
INFO 2017-07-30 07:38:22,846 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting connectors and tasks using config offset 2
INFO 2017-07-30 07:38:22,849 [pool-6-thread-1] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting task my-group
INFO 2017-07-30 07:38:22,849 [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Creating task my-group
INFO 2017-07-30 07:38:22,851 [pool-6-thread-1] org.apache.kafka.connect.runtime.ConnectorConfig: ConnectorConfig values:
connector.class = IndexerSinkConnector
key.converter = null
name = my-group
tasks.max = 3
transforms = null
value.converter = null
ERROR 2017-07-30 07:38:22,856 [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Failed to start task my-group
java.lang.NullPointerException: null
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)
at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
INFO 2017-07-30 07:38:22,890 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished starting connectors and tasks
NPE 起源于此https://github.com/apache/kafka/blob/0.10.2.0/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L52您的配置主题中是否有多个分区?配置主题只允许一个分区。此要求从 0.11 开始严格执行,因此未来版本不应遇到此问题。
我启动了 运行 一个自定义的 kafka connect 插件,它将数据从 kafka 转储到 ElasticSearch,并且我的任务会定期无缘无故地死掉(几小时到一天后)
我在连接状态中看到了这个:
curl -s http://localhost:8083/connectors/my-custom-sink/status | python -m json.tool
{
"connector": {
"state": "RUNNING",
"worker_id": "MASKED_IP1:8083"
},
"name": "my-custom-sink",
"tasks": [
{
"id": 0,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
"worker_id": "MASKED_IP1:8083"
},
{
"id": 1,
"state": "FAILED",
"trace": "java.lang.NullPointerException\n\tat org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)\n\tat org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)\n\tat org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)\n\tat org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)\n\tat java.util.concurrent.FutureTask.run(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.lang.Thread.run(Unknown Source)\n",
"worker_id": "MASKED_IP2:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "MASKED_IP3:8083"
}
]
}
任何见解将不胜感激(即 kafka 在抛出 npe 时试图做什么)
更新: 这是卡夫卡 0.10.2.0
这是我在工人死前在日志中看到的内容
INFO 2017-07-30 07:38:17,146 [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka version : 0.10.2.0
INFO 2017-07-30 07:38:17,146 [DistributedHerder] org.apache.kafka.common.utils.AppInfoParser: Kafka commitId : 576d93a8dc0cf421
INFO 2017-07-30 07:38:17,172 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN:9092 (id: 2147483643 rack: null) for group my-group.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,223 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,224 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,224 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,226 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Removed connector my-group due to null configuration. This is usually intentional and does not indicate an issue.
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Finished reading KafkaBasedLog for topic connect-configs
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.util.KafkaBasedLog: Started KafkaBasedLog for topic connect-configs
INFO 2017-07-30 07:38:17,228 [DistributedHerder] org.apache.kafka.connect.storage.KafkaConfigBackingStore: Started KafkaConfigBackingStore
INFO 2017-07-30 07:38:17,233 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Herder started
INFO 2017-07-30 07:38:17,239 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Discovered coordinator MASKED_FQDN1:9092 (id: 2147483643 rack: null) for group my-group.
INFO 2017-07-30 07:38:17,241 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: (Re-)joining group my-group
INFO 2017-07-30 07:38:20,868 [CLASSPATH traversal thread.] org.reflections.Reflections: Reflections took 3600 ms to scan 1 urls, producing 7612 keys and 36389 values
INFO 2017-07-30 07:38:22,511 [DistributedHerder] org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Successfully joined group my-group with generation 804011
INFO 2017-07-30 07:38:22,516 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Joined group and got assignment: Assignment{error=0, leader='connect-1-29930623-f17a-43ca-bed9-7f8a6fb5317a', leaderUrl='http://MASKED_IP:8083/', offset=2, connectorIds=[], taskIds=[my-group]}
WARN 2017-07-30 07:38:22,521 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Catching up to assignment's config offset.
INFO 2017-07-30 07:38:22,521 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Current config state offset -1 is behind group assignment 2, reading to end of config log
INFO 2017-07-30 07:38:22,846 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished reading to end of log and updated config snapshot, new config log offset: 2
INFO 2017-07-30 07:38:22,846 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting connectors and tasks using config offset 2
INFO 2017-07-30 07:38:22,849 [pool-6-thread-1] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Starting task my-group
INFO 2017-07-30 07:38:22,849 [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Creating task my-group
INFO 2017-07-30 07:38:22,851 [pool-6-thread-1] org.apache.kafka.connect.runtime.ConnectorConfig: ConnectorConfig values:
connector.class = IndexerSinkConnector
key.converter = null
name = my-group
tasks.max = 3
transforms = null
value.converter = null
ERROR 2017-07-30 07:38:22,856 [pool-6-thread-1] org.apache.kafka.connect.runtime.Worker: Failed to start task my-group
java.lang.NullPointerException: null
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:52)
at org.apache.kafka.connect.runtime.TaskConfig.<init>(TaskConfig.java:52)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:313)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:834)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access00(DistributedHerder.java:101)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:848)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.call(DistributedHerder.java:844)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
INFO 2017-07-30 07:38:22,890 [DistributedHerder] org.apache.kafka.connect.runtime.distributed.DistributedHerder: Finished starting connectors and tasks
NPE 起源于此https://github.com/apache/kafka/blob/0.10.2.0/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L52您的配置主题中是否有多个分区?配置主题只允许一个分区。此要求从 0.11 开始严格执行,因此未来版本不应遇到此问题。