Kafka 0.9 如何在使用 KafkaConsumer 手动提交偏移量时重新使用消息
Kafka 0.9 How to re-consume message when manually committing offset with a KafkaConsumer
我正在编写一个消费者,一旦一系列记录被提交到 Mongo,它就会手动提交偏移量。
在 Mongo 错误或任何其他错误的情况下,将尝试将记录保存到错误处理集合中
以便日后重播。
如果 Mongo 关闭,那么我希望消费者在尝试从 Kakfa 的未提交偏移量中读取记录之前停止处理一段时间。
下面的示例有效,但我想知道这种情况的最佳做法是什么?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
如果我不重新创建消费者,我会收到以下错误。
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
如果您没有提交偏移量并且 auto.commit.enable 属性 为假,那么当对 Mongo 的调用失败时,您只需等待您认为必要的时间并重试轮询()。
您看到的问题是新消费者使用 poll() 作为心跳机制,所以如果您等待超时请求的时间更长,那么主题的协调器将踢出消费者,因为它会认为已经死了,它将重新平衡该组。所以等待 mongo 但你可能想过一会儿轮询()。
编辑:作为解决方法,您可以将此 属性 提高 request.timeout.ms
希望对您有所帮助!
据我了解,(新)客户端是保持消耗的偏移量的客户端。提交将偏移量发送到服务器,但它对该客户端的下一次轮询没有影响,因为客户端对服务器说 "give me next messages on THAT offset"。
为什么将偏移量发送到服务器?为下一次再平衡。因此,服务器使用提交的偏移量的唯一情况是当某些客户端 dies/disconnects - 然后重新平衡分区,并且通过这种重新平衡,客户端从服务器获得偏移量。
所以如果你不提交offset然后调用poll(),你就不能指望消息会被再次读取。为此,必须有可能回滚客户端中的偏移量。我没有尝试,但我认为调用 KafkaConsumer.seek 到失败消息的偏移量应该可以解决问题。
顺便说一句,这样你甚至可以提交最后一个成功处理的消息并寻找第一个失败的消息,这样你就不需要重复整个记录列表,当某些消息在中间发生失败时它。
这是我使用客户端版本 0.10.0 的代码。
看来满足你的需求。
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageProcesser implements Runnable {
private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class);
private final ExecutorService pool = Executors.newFixedThreadPool(4);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final AtomicBoolean closed = new AtomicBoolean(false);
public MessageProcesser(String groupId, String topic, String kafkaServer) {
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "false");
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(Collections.singleton(topic));
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(1000 * 60);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
if (null == value) {
continue;
}
boolean processResult = false;
try {
Future<Object> f = pool.submit(new ProcessCommand(value));
processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!processResult) {
//here if process fail, seek to current offset
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
} else {
this.commitAsyncOffset(record);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (!closed.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
// ignore
}
}
}
}
public void shutdown() {
closed.set(true);
}
public void commitAsyncOffset(ConsumerRecord<String, String> record) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace()));
}
}
});
}
}
我正在编写一个消费者,一旦一系列记录被提交到 Mongo,它就会手动提交偏移量。
在 Mongo 错误或任何其他错误的情况下,将尝试将记录保存到错误处理集合中
以便日后重播。
如果 Mongo 关闭,那么我希望消费者在尝试从 Kakfa 的未提交偏移量中读取记录之前停止处理一段时间。
下面的示例有效,但我想知道这种情况的最佳做法是什么?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
如果我不重新创建消费者,我会收到以下错误。
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
如果您没有提交偏移量并且 auto.commit.enable 属性 为假,那么当对 Mongo 的调用失败时,您只需等待您认为必要的时间并重试轮询()。
您看到的问题是新消费者使用 poll() 作为心跳机制,所以如果您等待超时请求的时间更长,那么主题的协调器将踢出消费者,因为它会认为已经死了,它将重新平衡该组。所以等待 mongo 但你可能想过一会儿轮询()。
编辑:作为解决方法,您可以将此 属性 提高 request.timeout.ms
希望对您有所帮助!
据我了解,(新)客户端是保持消耗的偏移量的客户端。提交将偏移量发送到服务器,但它对该客户端的下一次轮询没有影响,因为客户端对服务器说 "give me next messages on THAT offset"。 为什么将偏移量发送到服务器?为下一次再平衡。因此,服务器使用提交的偏移量的唯一情况是当某些客户端 dies/disconnects - 然后重新平衡分区,并且通过这种重新平衡,客户端从服务器获得偏移量。
所以如果你不提交offset然后调用poll(),你就不能指望消息会被再次读取。为此,必须有可能回滚客户端中的偏移量。我没有尝试,但我认为调用 KafkaConsumer.seek 到失败消息的偏移量应该可以解决问题。
顺便说一句,这样你甚至可以提交最后一个成功处理的消息并寻找第一个失败的消息,这样你就不需要重复整个记录列表,当某些消息在中间发生失败时它。
这是我使用客户端版本 0.10.0 的代码。
看来满足你的需求。
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MessageProcesser implements Runnable {
private static Logger logger = LoggerFactory.getLogger(MessageProcesser.class);
private final ExecutorService pool = Executors.newFixedThreadPool(4);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private final AtomicBoolean closed = new AtomicBoolean(false);
public MessageProcesser(String groupId, String topic, String kafkaServer) {
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", kafkaServer);
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "false");
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(Collections.singleton(topic));
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(1000 * 60);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
if (null == value) {
continue;
}
boolean processResult = false;
try {
Future<Object> f = pool.submit(new ProcessCommand(value));
processResult = (boolean) f.get(100, TimeUnit.MILLISECONDS);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!processResult) {
//here if process fail, seek to current offset
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
} else {
this.commitAsyncOffset(record);
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
if (!closed.get()) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
// ignore
}
}
}
}
public void shutdown() {
closed.set(true);
}
public void commitAsyncOffset(ConsumerRecord<String, String> record) {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));
consumer.commitAsync(offsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null) {
logger.error("kafka offset commit fail. {} {}", offsets, PushUtil.getStackString(e.getStackTrace()));
}
}
});
}
}