Kafka Connect:java.lang.IllegalStateException:当前没有分区分配
Kafka Connect: java.lang.IllegalStateException: No current assignment for partition
我是 运行 Kafka Connect on Kubernetes(8-16 个节点,自动缩放)。我总共定义了 44 个连接器,每个 Kafka 主题一个(每个主题一个分区)。这些主题由 Debezium / Postgresql 制作。有 3 个 Kafka 节点。每个连接器 tasks.max 设置为 4。我的大多数连接器(但不是每个!)都有一个(总是一个)失败的任务,原因是 java.lang.IllegalStateException:分区 -0 没有当前分配。
这里不是 Kafka 专家,注意 ;) 我假设有 3 个 Kafka 节点,所以 3 个工作人员做得很好,而第 4 个任务没有任何连接,所以它失败了。但是为什么有时候有4个任务运行就好了?
此外,我经常遇到 "Conflicting operation due to rebalancing" 问题,可能持续几分钟,甚至几小时。最近我删除了所有 pods 并且他们重新启动了自己,问题消失了,但这不是长期解决方案。
tasks.max 推荐值是多少?提前致谢!
异常:
java.lang.IllegalStateException: No current assignment for partition table-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:70)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
接收器连接器配置:
connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas true
sanitizeTopics true
autoCreateTables true
topics <topic-name>
tasks.max 3
schemaRegistryLocation http://<ip>:8081
project <big-query-project>
maxWriteSize 10000
datasets .*=<big-query-dataset>
task.class com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait 1000
bufferSize 100000
并抛出上述异常java.lang.IllegalStateException: No current assignment for [...]
属性 tasks.max
的值取决于几个因素。最重要的一个是特定的连接器。
特定连接器取决于其逻辑和 tasks.max
的值计算将创建的 Task
的数量。
前任。 FileStreamSourceConnector
始终创建 1 个任务,因此即使您传递的值高于 1,它也只会创建一个。
同样的情况是 PostgresConnector
它 平行 到一个。
tasks.max
值还应取决于其他因素,例如:Kafka Connect 模式、您拥有多少个 Kafka Connect 实例、CPU 机器等
我怎么知道你在使用 SourceConnector (PostgresConnector
)。
源连接器不从 Kafka 轮询数据。例外,您发布的内容与某些 SinkConnector
有关。
如果用户正在使用 SinkConnector
,您的 tasks.max
不应超过分区数。
如果您启动的任务多于分区数,一些任务将处于空闲状态(状态为 运行,但它们不处理数据)并且可能会发生重新平衡。
我是 运行 Kafka Connect on Kubernetes(8-16 个节点,自动缩放)。我总共定义了 44 个连接器,每个 Kafka 主题一个(每个主题一个分区)。这些主题由 Debezium / Postgresql 制作。有 3 个 Kafka 节点。每个连接器 tasks.max 设置为 4。我的大多数连接器(但不是每个!)都有一个(总是一个)失败的任务,原因是 java.lang.IllegalStateException:分区 -0 没有当前分配。
这里不是 Kafka 专家,注意 ;) 我假设有 3 个 Kafka 节点,所以 3 个工作人员做得很好,而第 4 个任务没有任何连接,所以它失败了。但是为什么有时候有4个任务运行就好了?
此外,我经常遇到 "Conflicting operation due to rebalancing" 问题,可能持续几分钟,甚至几小时。最近我删除了所有 pods 并且他们重新启动了自己,问题消失了,但这不是长期解决方案。
tasks.max 推荐值是多少?提前致谢!
异常:
java.lang.IllegalStateException: No current assignment for partition table-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:259)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:264)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access00(WorkerSinkTask.java:70)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:675)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:291)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:406)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:341)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1214)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:445)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:318)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
接收器连接器配置:
connector.class com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
autoUpdateSchemas true
sanitizeTopics true
autoCreateTables true
topics <topic-name>
tasks.max 3
schemaRegistryLocation http://<ip>:8081
project <big-query-project>
maxWriteSize 10000
datasets .*=<big-query-dataset>
task.class com.wepay.kafka.connect.bigquery.BigQuerySinkTask
keyfile /credentials/<credentials-file>.json
name <connector-name>
schemaRetriever com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever
tableWriteWait 1000
bufferSize 100000
并抛出上述异常java.lang.IllegalStateException: No current assignment for [...]
属性 tasks.max
的值取决于几个因素。最重要的一个是特定的连接器。
特定连接器取决于其逻辑和 tasks.max
的值计算将创建的 Task
的数量。
前任。 FileStreamSourceConnector
始终创建 1 个任务,因此即使您传递的值高于 1,它也只会创建一个。
同样的情况是 PostgresConnector
它 平行 到一个。
tasks.max
值还应取决于其他因素,例如:Kafka Connect 模式、您拥有多少个 Kafka Connect 实例、CPU 机器等
我怎么知道你在使用 SourceConnector (PostgresConnector
)。
源连接器不从 Kafka 轮询数据。例外,您发布的内容与某些 SinkConnector
有关。
如果用户正在使用 SinkConnector
,您的 tasks.max
不应超过分区数。
如果您启动的任务多于分区数,一些任务将处于空闲状态(状态为 运行,但它们不处理数据)并且可能会发生重新平衡。