运行 没有指定分区的kafka消费者
Run kafka consumer without specifying partition
最近在学习Kafka,我的consumer消费不到任何记录,除非我指定--parititon 0
参数。换句话说,我不能使用像这样的记录:
kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic
但工作方式如下:
kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic --partition 0
主要问题是,当我转到 java 代码时,我的 KafkaConsumer
class 无法获取记录,我需要知道如何指定 partition
java KafkaConsumer
中的数字 ?!
我当前的 java 代码是:
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger((ConsumerDemo.class.getName()));
String bootstrapServer = "127.0.0.10:9092";
String groupId = "my-kafka-java-app";
String topic = "first-topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
//properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic
consumer.subscribe(Collections.singleton(topic)); //means subscribtion to one topic
// poll for new data
while(true){
//consumer.poll(100); old way
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
logger.info("Key: " + record.key() + ", Value: "+ record.value() );
logger.info("Partition: " + record.partition() + ", Offset: "+ record.offset());
}
}
}
}
顾名思义,ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
属性的目的是配置一个Partition Assignment Strategy而不是按照命令行的指示设置一个固定的分区。
使用的默认策略是可以更改的 RangeAssignor
,例如 StickyAssignor
,如下所示:
properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());
您可以阅读有关 Kafka Client Side Assignment Proposal 的更多信息。
你的做法是正确的。订阅主题时不需要指定分区。也许你的消费组已经消费了topic中的所有消息,并且提交了最新的offsets。
确保当您 运行 您的应用程序或创建一个新的消费者组从头开始使用时正在生成新消息(如果您将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为“最早”)
经过大量检查,我的解决方案是使用 consumer.assign
和 consumer.seek
而不是 consumer.subscribe
并且没有指定 groupId。但是我觉得应该有更优的方案
java 代码如下:
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic
//consumer.subscribe(Collections.singleton(topic)); //means subscription to one topic
// using assign and Seek, are mostly used to replay data or fetch a specific msg
TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
long offsetToReadFrom = 15L;
// assign
consumer.assign(Arrays.asList(partitionToReadFrom));
// seek: for a specific offset to read from
consumer.seek(partitionToReadFrom, offsetToReadFrom);
最近在学习Kafka,我的consumer消费不到任何记录,除非我指定--parititon 0
参数。换句话说,我不能使用像这样的记录:
kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic
但工作方式如下:
kafka-console-consumer --bootstrap-server 127.0.0.10:9092 --topic first-topic --partition 0
主要问题是,当我转到 java 代码时,我的 KafkaConsumer
class 无法获取记录,我需要知道如何指定 partition
java KafkaConsumer
中的数字 ?!
我当前的 java 代码是:
public class ConsumerDemo {
public static void main(String[] args) {
Logger logger = LoggerFactory.getLogger((ConsumerDemo.class.getName()));
String bootstrapServer = "127.0.0.10:9092";
String groupId = "my-kafka-java-app";
String topic = "first-topic";
// create consumer configs
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
//properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, partition);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic
consumer.subscribe(Collections.singleton(topic)); //means subscribtion to one topic
// poll for new data
while(true){
//consumer.poll(100); old way
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records){
logger.info("Key: " + record.key() + ", Value: "+ record.value() );
logger.info("Partition: " + record.partition() + ", Offset: "+ record.offset());
}
}
}
}
顾名思义,ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG
属性的目的是配置一个Partition Assignment Strategy而不是按照命令行的指示设置一个固定的分区。
使用的默认策略是可以更改的 RangeAssignor
,例如 StickyAssignor
,如下所示:
properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,StickyAssignor.class.getName());
您可以阅读有关 Kafka Client Side Assignment Proposal 的更多信息。
你的做法是正确的。订阅主题时不需要指定分区。也许你的消费组已经消费了topic中的所有消息,并且提交了最新的offsets。
确保当您 运行 您的应用程序或创建一个新的消费者组从头开始使用时正在生成新消息(如果您将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为“最早”)
经过大量检查,我的解决方案是使用 consumer.assign
和 consumer.seek
而不是 consumer.subscribe
并且没有指定 groupId。但是我觉得应该有更优的方案
java 代码如下:
// create consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
// subscribe consumer to our topic
//consumer.subscribe(Collections.singleton(topic)); //means subscription to one topic
// using assign and Seek, are mostly used to replay data or fetch a specific msg
TopicPartition partitionToReadFrom = new TopicPartition(topic, 0);
long offsetToReadFrom = 15L;
// assign
consumer.assign(Arrays.asList(partitionToReadFrom));
// seek: for a specific offset to read from
consumer.seek(partitionToReadFrom, offsetToReadFrom);