Kafka 消费者(相同的组 ID)总是从同一个分区读取

Kafka consumers (same group-id) stucked in reading always from same partition

我有 2 个消费者 运行 相同的组 ID 并从具有 3 个分区的主题读取并使用 KafkaAvroDeserializer 解析消息。消费者有这些设置:

  def avroConsumerSettings[T <: SpecificRecordBase](schemaRegistry: String, bootstrapServer: String, groupId: String)(implicit
  actorSystem: ActorSystem): ConsumerSettings[String, T] = {
     val kafkaAvroSerDeConfig = Map[String, Any](
       AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistry,
       KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString
     )
    val kafkaAvroDeserializer = new KafkaAvroDeserializer()
    kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
    val deserializer =
      kafkaAvroDeserializer.asInstanceOf[Deserializer[T]]

    ConsumerSettings(actorSystem, new StringDeserializer, deserializer)
      .withBootstrapServers(bootstrapServer)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
  }

我试图发送一条格式错误的消息来测试错误处理,现在我的消费者卡住了(总是重试从同一分区读取,因为我正在使用 RestartSource.onFailuresWithBackoff);但对我来说奇怪的是(AFAIK 同一组 ID 中的每个消费者都无法从同一分区读取)是如果我 运行 另一个消费者它也会卡住,因为它再次从无法读取消息的同一分区读取.

谁能帮我理解我做错了什么?

当您在失败后重新启动 Kafka 源时,会创建一个新的消费者;最终失败源中的消费者被卡夫卡宣布死亡,触发重新平衡。在该重新平衡中,没有外部保证组中的哪个消费者将被分配到哪个分区。这可以解释为什么组中的其他消费者读取该分区。

这里的毒消息破坏消费的问题是我开发了一种偏好,通过使用 ByteArrayDeserializer 将来自 Kafka 的键和值视为 blob,并在流中自己进行反序列化,这使我能够记录(例如,通过日志记录;将消息生成到死信主题以供以后检查也可以)该主题中存在格式错误的消息,并通过提交偏移量继续进行。 Either 在 Scala 中特别适合将格式错误的消息直接移动到提交者。