用于 Kafka 的 Azure 事件中心。来自同一组的 2 个消费者无限地重新平衡
Azure Event Hubs for Kafka. 2 consumers from same group rebalance infinetely
我在消费者网站上使用 Azure Event Hubs for Kafka 和 Spring Kafka 1.3.5(出于兼容性原因)。这是我的配置:
@EnableKafka
@Configuration
class EventHubsKafkaConfig(@Value("${eventhubs.broker}") val eventHubsBroker: String,
@Value("${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
@Value("${eventhubs.consumer-group}") val consumerGroup: String) {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
factory.consumerFactory = consumerFactory
return factory
}
@Bean
fun consumerFactory(consumerConfigs: Map<String, Any>) =
DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}
}
和消费者组件:
@Component
class NewMailEventConsumer {
@KafkaListener(topics = ["${eventhubs.new-mails.topic-name}"])
fun newMails(newMailEvent: NewMailEvent) {
logger.info { "new mail event: $newMailEvent" }
}
companion object : KLogging()
}
data class NewMailEvent(val mailbox: String, val mailUuid: String)
当我使用此代码启动 2 个消费者应用程序时,我看到奇怪的警告,这些警告永远不会结束:
Successfully joined group offer-application-bff-local with generation 5
web_1 | 2018-07-09 11:20:42.950 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:20:42.983 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:28.686 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1 | 2018-07-09 11:21:28.687 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:28.688 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group offer-application-bff-local
web_1 | 2018-07-09 11:21:29.670 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) dead for group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.099 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) for group offer-application-bff-local.
web_1 | 2018-07-09 11:21:43.131 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.344 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group offer-application-bff-local with generation 7
web_1 | 2018-07-09 11:21:43.345 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.375 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:46.377 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
定期出现以下异常:
2018-07-09 11:36:21.602 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.common.protocol.Errors : Unexpected error code: 60.
web_1 | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Container exception
web_1 |
web_1 | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
web_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
web_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
定期这个
Failed to send SSL Close message
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]
对于单个消费者,它就像一个魅力,没有警告,什么都没有。
有人知道那里出了什么问题吗?
使用相同组 ID 的消费者数量不能超过给定主题的分区数量。
例如具有 3 个分区的主题可以有 1-3 个消费者使用相同的组 ID。
我假设你的主题只有一个分区,两个消费者一直在争夺这个资源。您要么必须删除您的一个消费者,要么为您的主题添加一个额外的分区。
最终,我发现了问题所在。
正如您在代码中看到的,我没有在 kafka 消费者中指定 client.id
属性。这对于 spring-kafka 至关重要,因为它试图为消费者组内的两个消费者使用一些自动生成的 client.id = consumer-0
。这导致两个同名消费者之间的分区无限重新平衡。我需要将其设置为部分随机字符串 ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}"
才能使其正常工作:
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}
我在消费者网站上使用 Azure Event Hubs for Kafka 和 Spring Kafka 1.3.5(出于兼容性原因)。这是我的配置:
@EnableKafka
@Configuration
class EventHubsKafkaConfig(@Value("${eventhubs.broker}") val eventHubsBroker: String,
@Value("${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
@Value("${eventhubs.consumer-group}") val consumerGroup: String) {
@Bean
fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
factory.consumerFactory = consumerFactory
return factory
}
@Bean
fun consumerFactory(consumerConfigs: Map<String, Any>) =
DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}
}
和消费者组件:
@Component
class NewMailEventConsumer {
@KafkaListener(topics = ["${eventhubs.new-mails.topic-name}"])
fun newMails(newMailEvent: NewMailEvent) {
logger.info { "new mail event: $newMailEvent" }
}
companion object : KLogging()
}
data class NewMailEvent(val mailbox: String, val mailUuid: String)
当我使用此代码启动 2 个消费者应用程序时,我看到奇怪的警告,这些警告永远不会结束:
Successfully joined group offer-application-bff-local with generation 5
web_1 | 2018-07-09 11:20:42.950 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:20:42.983 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:28.686 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1 | 2018-07-09 11:21:28.687 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:21:28.687 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:28.688 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group offer-application-bff-local
web_1 | 2018-07-09 11:21:29.670 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) dead for group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.099 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Discovered coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) for group offer-application-bff-local.
web_1 | 2018-07-09 11:21:43.131 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.344 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : Successfully joined group offer-application-bff-local with generation 7
web_1 | 2018-07-09 11:21:43.345 INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1 | 2018-07-09 11:21:43.375 INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1 | 2018-07-09 11:21:46.377 WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
定期出现以下异常:
2018-07-09 11:36:21.602 WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.common.protocol.Errors : Unexpected error code: 60.
web_1 | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Container exception
web_1 |
web_1 | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1 | at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
web_1 | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
web_1 | at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
定期这个
Failed to send SSL Close message
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]
对于单个消费者,它就像一个魅力,没有警告,什么都没有。 有人知道那里出了什么问题吗?
使用相同组 ID 的消费者数量不能超过给定主题的分区数量。
例如具有 3 个分区的主题可以有 1-3 个消费者使用相同的组 ID。
我假设你的主题只有一个分区,两个消费者一直在争夺这个资源。您要么必须删除您的一个消费者,要么为您的主题添加一个额外的分区。
最终,我发现了问题所在。
正如您在代码中看到的,我没有在 kafka 消费者中指定 client.id
属性。这对于 spring-kafka 至关重要,因为它试图为消费者组内的两个消费者使用一些自动生成的 client.id = consumer-0
。这导致两个同名消费者之间的分区无限重新平衡。我需要将其设置为部分随机字符串 ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}"
才能使其正常工作:
@Bean
fun consumerConfigs(): Map<String, Any> {
val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"
return mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}",
ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
SaslConfigs.SASL_MECHANISM to "PLAIN",
SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"$ConnectionString\" password=\"$connectionString\";",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
)
}