使用新消费者 api 将 kafka 升级到 .9
kafka upgrade to .9 with new consumer api
我们正在将我们的 kafka 实施升级到 .9 并使用新的消费者 java api 来创建 consumer.I 我正在使用下面的消费者代码,我们正在使用将主题设置为消费者在 LINE A 和 LINE B 中是对我们服务的调用,它处理我们收到的消息。现在的问题是,如果我们的消息处理时间超过 30 秒,我们将收到异常。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}
例外是
2016-03-03 10:47:35.095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
2016-03-03 10:47:35.096 ERROR 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group TEST-GROUP
CommitFailedException
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
提交偏移量时出现上述异常。
任何建议都会有所帮助谢谢
发生这种情况是因为新的消费者是单线程的,并且它可以与消费者组保持心跳的唯一方法是通过轮询或提交偏移量,30 秒后组协调器将您的消费者标记为已死并呼吁集团重新平衡。
对于这种情况,您可以增加 request.timeout.ms
或在 2 个线程之间拆分消耗和处理工作。
您可以通过设置
来限制 poll() 返回的消息数
max.partition.fetch.bytes
某个合适的阈值,该阈值大于您的最大消息,但又太低以至于您每次轮询获得的消息较少。
Kafka 0.10.x支持通过设置
明确限制返回给客户端的消息数量
max.poll.records
我们正在将我们的 kafka 实施升级到 .9 并使用新的消费者 java api 来创建 consumer.I 我正在使用下面的消费者代码,我们正在使用将主题设置为消费者在 LINE A 和 LINE B 中是对我们服务的调用,它处理我们收到的消息。现在的问题是,如果我们的消息处理时间超过 30 秒,我们将收到异常。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("heartbeat.interval.ms", "1000");
props.put("receive.buffer.bytes", 10485760);
props.put("fetch.message.max.bytes", 5242880);
props.put("enable.auto.commit", false);
//with partition assigned to consumer
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(props);
// TopicPartition partition0 = new TopicPartition("TEST-TOPIC", 0);
//consumer.assign(Arrays.asList(partition0));
//assign topic to consumer without partition
//LINE A
consumer.subscribe(Arrays.asList("TEST-TOPIC"), new ConsumerRebalanceListenerImp());
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
try {
ConsumerRecords<Object, Object> records = consumer.poll(1000);
consumeFromQueue(records);//LINE B
consumer.commitSync();
} catch (CommitFailedException e) {
e.printStackTrace();
System.out.println("CommitFailedException");
} catch (Exception e) {
e.printStackTrace();
System.out.println("Exception in while consuming messages");
}
例外是
2016-03-03 10:47:35.095 INFO 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead. 2016-03-03 10:47:35.096 ERROR 6448 --- [ask-scheduler-3] o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group TEST-GROUP CommitFailedException org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644) at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
提交偏移量时出现上述异常。 任何建议都会有所帮助谢谢
发生这种情况是因为新的消费者是单线程的,并且它可以与消费者组保持心跳的唯一方法是通过轮询或提交偏移量,30 秒后组协调器将您的消费者标记为已死并呼吁集团重新平衡。
对于这种情况,您可以增加 request.timeout.ms
或在 2 个线程之间拆分消耗和处理工作。
您可以通过设置
来限制 poll() 返回的消息数max.partition.fetch.bytes
某个合适的阈值,该阈值大于您的最大消息,但又太低以至于您每次轮询获得的消息较少。
Kafka 0.10.x支持通过设置
明确限制返回给客户端的消息数量max.poll.records