KafkaConsumer 0.10 Java API 错误消息:没有分区的当前分配
KafkaConsumer 0.10 Java API error message: No current assignment for partition
我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量中使用。我查了一下,发现有一个seek方法,但是它抛出了一个异常。有人有类似的用例或解决方案吗?
代码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
异常
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
在你可以seek()
之前,你首先需要subscribe()
到一个主题或assign()
向消费者划分一个主题。还请记住,subscribe()
和 assign()
是惰性的——因此,在使用 seek()
之前,您还需要执行 "dummy call" 到 poll()
。
Note: as of Kafka 2.0, the new poll(Duration timeout)
is async and it's not guaranteed that you have a complete assignment when poll
returns. Thus, you might need to check your assignment before using seek()
and also poll
again to refresh the assignment. (Cf. KIP-266 for details)
如果你使用subscribe()
,你使用组管理:因此,你可以使用相同的group.id
启动多个消费者,主题的所有分区将平均分配给组内的所有消费者自动(每个分区将分配给组中的单个消费者)。
如果要读取特定分区,需要通过assign()
使用手动赋值。这允许你做任何你想做的作业。
顺便说一句:KafkaConsumer
有一个很长的详细 [=39=] JavaDoc,包括示例。值得一读。
如果您不想使用 poll() 和检索地图记录,并更改偏移量本身。
卡夫卡版本 0.11
试试这个:
...
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2"));
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList());
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator");
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings
consumer.seekToBeginning(partitions); //or other seek
轮询协调器事件。这确保协调器是已知的并且消费者已加入该组(如果它正在使用组管理)。如果启用,这也会处理定期偏移提交。
请使用 consumer。assign with consumer。seek 而不是 consumer。subscribe
经过这些更改后,它将正常执行。
我正在使用 KafkaConsumer 0.10 Java api。我想从特定分区和特定偏移量中使用。我查了一下,发现有一个seek方法,但是它抛出了一个异常。有人有类似的用例或解决方案吗?
代码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
异常
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
at xx.xxx.xxx.Test.main(Test.java:182)
在你可以seek()
之前,你首先需要subscribe()
到一个主题或assign()
向消费者划分一个主题。还请记住,subscribe()
和 assign()
是惰性的——因此,在使用 seek()
之前,您还需要执行 "dummy call" 到 poll()
。
Note: as of Kafka 2.0, the new
poll(Duration timeout)
is async and it's not guaranteed that you have a complete assignment whenpoll
returns. Thus, you might need to check your assignment before usingseek()
and alsopoll
again to refresh the assignment. (Cf. KIP-266 for details)
如果你使用subscribe()
,你使用组管理:因此,你可以使用相同的group.id
启动多个消费者,主题的所有分区将平均分配给组内的所有消费者自动(每个分区将分配给组中的单个消费者)。
如果要读取特定分区,需要通过assign()
使用手动赋值。这允许你做任何你想做的作业。
顺便说一句:KafkaConsumer
有一个很长的详细 [=39=] JavaDoc,包括示例。值得一读。
如果您不想使用 poll() 和检索地图记录,并更改偏移量本身。 卡夫卡版本 0.11 试试这个:
...
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("Test_topic1", "Test_topic2"));
List<TopicPartition> partitions =consumer.partitionsFor("Test_topic1").stream().map(part->{TopicPartition tp = new TopicPartition(part.topic(),part.partition()); return tp;}).collect(Collectors.toList());
Field coordinatorField = consumer.getClass().getDeclaredField("coordinator");
coordinatorField.setAccessible(true);
ConsumerCoordinator coordinator = (ConsumerCoordinator)coordinatorField.get(consumer);
coordinator.poll(new Date().getTime(), 1000);//Watch out for your local date and time settings
consumer.seekToBeginning(partitions); //or other seek
轮询协调器事件。这确保协调器是已知的并且消费者已加入该组(如果它正在使用组管理)。如果启用,这也会处理定期偏移提交。
请使用 consumer。assign with consumer。seek 而不是 consumer。subscribe
经过这些更改后,它将正常执行。