卡夫卡消费者不消费
Kafka consumer not consuming
这是我从 java 客户端构建 kafka 消费者的代码。
def buildConsumer[Key, Value](
configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
): KafkaJavaConsumer[Key, Value] = {
val settingsMap: Map[String, Object] = Map(
"bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
"group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
"enable.auto.commit" -> "true",
"auto.commit.interval.ms" -> commitInterval.toString,
"auto.offset.reset" -> "earliest"
) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
consumer.subscribe(Seq(configuration.topic).asJava)
consumer
}
我的 kafka 运行 在端口 6050 上,我已经在控制台中测试它从该特定端口生成和使用。我想知道我的问题是否与我上面的配置有关。我还使用 EmbeddedKafka
框架测试了上面的代码,问题似乎出在实际的 kafka 服务器 运行.
编辑:
我忘了补充一点,我有多个消费者(具有不同的 group.id
)从同一个代理消费,不确定这是否是问题所在。
确保,
No. of partitions in the topic >= No. of consumer instances in the
group
否则,组中的某些消费者实例将不会被分配任何分区。
查看分区数,使用kafka-topics.sh命令
> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
Topic:test PartitionCount:6 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 5 Leader: 0 Replicas: 0 Isr: 0
我仍然不确定问题出在哪里,但是通过删除 zookeeper 数据文件夹和所有 kafka 日志,consumer/producer 开始按预期工作。我认为这可能与我删除日志文件以清除主题的问题有关,而没有使用正式的 kafka 管理工具来删除主题。
这是我从 java 客户端构建 kafka 消费者的代码。
def buildConsumer[Key, Value](
configuration: KafkaConfiguration, commitInterval: Long, groupId: Option[String] = None)(
implicit keyDeserializer: Deserializer[Key], valueDeserializer: Deserializer[Value]
): KafkaJavaConsumer[Key, Value] = {
val settingsMap: Map[String, Object] = Map(
"bootstrap.servers" -> s"${configuration.bootstrapHost}:${configuration.bootstrapPort}",
"group.id" -> groupId.getOrElse(s"${configuration.topic}-${UUID.randomUUID}"),
"enable.auto.commit" -> "true",
"auto.commit.interval.ms" -> commitInterval.toString,
"auto.offset.reset" -> "earliest"
) ++ configuration.additionalOptions.getOrElse(Map.empty[String, Object])
val consumer = new KafkaJavaConsumer[Key, Value](settingsMap.asJava, keyDeserializer, valueDeserializer)
consumer.subscribe(Seq(configuration.topic).asJava)
consumer
}
我的 kafka 运行 在端口 6050 上,我已经在控制台中测试它从该特定端口生成和使用。我想知道我的问题是否与我上面的配置有关。我还使用 EmbeddedKafka
框架测试了上面的代码,问题似乎出在实际的 kafka 服务器 运行.
编辑:
我忘了补充一点,我有多个消费者(具有不同的 group.id
)从同一个代理消费,不确定这是否是问题所在。
确保,
No. of partitions in the topic >= No. of consumer instances in the group
否则,组中的某些消费者实例将不会被分配任何分区。
查看分区数,使用kafka-topics.sh命令
> sh kafka-topics.sh --zookeeper localhost:2181 --topic test --describe
Topic:test PartitionCount:6 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 4 Leader: 0 Replicas: 0 Isr: 0
Topic: test Partition: 5 Leader: 0 Replicas: 0 Isr: 0
我仍然不确定问题出在哪里,但是通过删除 zookeeper 数据文件夹和所有 kafka 日志,consumer/producer 开始按预期工作。我认为这可能与我删除日志文件以清除主题的问题有关,而没有使用正式的 kafka 管理工具来删除主题。