卡夫卡消费者不消费

Kafka consumer not consuming

这是我从 java 客户端构建 kafka 消费者的代码。

  def buildConsumer[Key, Value](
    configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
    implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
  ): KafkaJavaConsumer[Key, Value] = {
    val settingsMap: Map[String, Object] = Map(
      "bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
      "group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
      "enable.auto.commit" -> "true",
      "auto.commit.interval.ms" -> commitInterval.toString,
      "auto.offset.reset" -> "earliest"
    ) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
    val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
    consumer.subscribe(Seq(configuration.topic).asJava)
    consumer
  }

我的 kafka 运行 在端口 6050 上,我已经在控制台中测试它从该特定端口生成和使用。我想知道我的问题是否与我上面的配置有关。我还使用 EmbeddedKafka 框架测试了上面的代码,问题似乎出在实际的 kafka 服务器 运行.

编辑:

我忘了补充一点,我有多个消费者(具有不同的 group.id)从同一个代理消费,不确定这是否是问题所在。

确保,

No. of partitions in the topic >= No. of consumer instances in the group

否则,组中的某些消费者实例将不会被分配任何分区。

查看分区数,使用kafka-topics.sh命令

> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe Topic:test PartitionCount:6 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 3 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 4 Leader: 0 Replicas: 0 Isr: 0 Topic: test Partition: 5 Leader: 0 Replicas: 0 Isr: 0

我仍然不确定问题出在哪里,但是通过删除 zookeeper 数据文件夹和所有 kafka 日志,consumer/producer 开始按预期工作。我认为这可能与我删除日志文件以清除主题的问题有关,而没有使用正式的 kafka 管理工具来删除主题。