我是卡夫卡的新手。我正在创建的消费者没有收到生产者发布的消息
I am new to kafka. The consumer that I am creating is not getting messages that the producer publishes
我正在创建消费者(其中只有一个消费者的消费者组):
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("auto.offset.reset", "largest");
properties.put("group.id", groupId);
properties.put("auto.commit.enable", "true");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
consumerMap.entrySet().stream().forEach(
streams -> {
streams.getValue().stream().forEach(
stream -> {
KafkaBasicConsumer customConsumer = new KafkaBasicConsumer();
try {
Future<?> consumerFuture = kafkaConsumerExecutor.submit(customConsumer);
kafkaConsumersFuture.put(groupId, consumerFuture);
} catch (Exception e) {
logger.error("---- Got error : "+ e.getMessage());
logger.error("Exception : ", e);
}
}
);
}
);
我为同一主题订阅了 2 个消费者。
我通过存储其未来对象然后调用取消订阅消费者
consumerFuture.cancel(Boolean.TRUE);
现在我用上面的代码再次订阅同一个消费者,它就成功注册了。
但是,当发布者现在发布时,新订阅的消费者没有收到消息,而注册的另一个消费者正在收到消息
我也在检查消费者的偏移量,当生产者发布但消费者没有收到消息时,它们会得到更新。
制作前:
组主题 Pid 偏移量 logSize 滞后
A T1 0 94 94 1
组主题 Pid 偏移量 logSize 滞后
B T1 0 94 94 1
生产后:
组主题 Pid 偏移量 logSize 滞后
A T1 0 95 97 2
组主题 Pid 偏移量 logSize 滞后
B T1 0 94 97 2
我无法弄清楚这是生产者方面的问题(分区不够)还是我以错误的方式创建了消费者
另外,我无法弄清楚日志和滞后列在这意味着什么。
如果有人可以提供帮助或需要更多详细信息,请告诉我。
我找到了解决问题的方法,感谢@nautilus 提醒更新。
我的主要目的是提供端点来订阅和取消订阅 kafka 中的消费者。
由于 kafka 仅提供订阅而不提供取消订阅(只能手动进行),因此我不得不在 kafka 实现上编写层。
我将消费者对象存储在静态映射中,键为组 ID(因为我的消费者组只能有一个消费者)
问题是我没有关闭取消订阅时创建的消费者,并且具有相同组 ID 的旧消费者阻止新消费者接收消息
私有静态地图kafkaConsumersFuture
根据一些参数,找出组id
kafkaConsumersFuture.put(groupId, consumerConnector);
我退订了
ConsumerConnector consumerConnector = kafkaConsumersFuture.get(groupId);
if(consumerConnector!=null) {
consumerConnector.shutdown();
kafkaConsumersFuture.remove(groupId);
}
我正在创建消费者(其中只有一个消费者的消费者组):
Properties properties = new Properties();
properties.put("zookeeper.connect","localhost:2181");
properties.put("auto.offset.reset", "largest");
properties.put("group.id", groupId);
properties.put("auto.commit.enable", "true");
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
consumerMap.entrySet().stream().forEach(
streams -> {
streams.getValue().stream().forEach(
stream -> {
KafkaBasicConsumer customConsumer = new KafkaBasicConsumer();
try {
Future<?> consumerFuture = kafkaConsumerExecutor.submit(customConsumer);
kafkaConsumersFuture.put(groupId, consumerFuture);
} catch (Exception e) {
logger.error("---- Got error : "+ e.getMessage());
logger.error("Exception : ", e);
}
}
);
}
);
我为同一主题订阅了 2 个消费者。 我通过存储其未来对象然后调用取消订阅消费者 consumerFuture.cancel(Boolean.TRUE);
现在我用上面的代码再次订阅同一个消费者,它就成功注册了。 但是,当发布者现在发布时,新订阅的消费者没有收到消息,而注册的另一个消费者正在收到消息
我也在检查消费者的偏移量,当生产者发布但消费者没有收到消息时,它们会得到更新。 制作前:
组主题 Pid 偏移量 logSize 滞后
A T1 0 94 94 1
组主题 Pid 偏移量 logSize 滞后
B T1 0 94 94 1
生产后:
组主题 Pid 偏移量 logSize 滞后
A T1 0 95 97 2
组主题 Pid 偏移量 logSize 滞后
B T1 0 94 97 2
我无法弄清楚这是生产者方面的问题(分区不够)还是我以错误的方式创建了消费者 另外,我无法弄清楚日志和滞后列在这意味着什么。
如果有人可以提供帮助或需要更多详细信息,请告诉我。
我找到了解决问题的方法,感谢@nautilus 提醒更新。
我的主要目的是提供端点来订阅和取消订阅 kafka 中的消费者。 由于 kafka 仅提供订阅而不提供取消订阅(只能手动进行),因此我不得不在 kafka 实现上编写层。
我将消费者对象存储在静态映射中,键为组 ID(因为我的消费者组只能有一个消费者)
问题是我没有关闭取消订阅时创建的消费者,并且具有相同组 ID 的旧消费者阻止新消费者接收消息
私有静态地图kafkaConsumersFuture
根据一些参数,找出组id
kafkaConsumersFuture.put(groupId, consumerConnector);
我退订了
ConsumerConnector consumerConnector = kafkaConsumersFuture.get(groupId);
if(consumerConnector!=null) {
consumerConnector.shutdown();
kafkaConsumersFuture.remove(groupId);
}