{Kafka V 0.10.1}如果异步生产者 Apache Kafka x 发生任何错误,则生产消息大约需要 1 分钟
{Kafka V 0.10.1}Producing message takes approximately 1 minute if any error occurs at async producer Apache Kafka x
我正在使用异步生产者向代理发送消息。如果我的回调中没有任何错误,我会看到生成消息大约需要 0.3 秒。但是当我低于错误 [1] 时,我发现生成消息需要 60 秒。
但我没有看到任何消息丢失。所有消息都在代理中可用。
是什么导致了这个错误?我在每生成 50 条消息时都会看到这种延迟。
出现此错误时如何提高生产者的性能?
代码;
producer.send(new ProducerRecord(topic, this), new ProducerCallback ());
private class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception ex) {
if (ex != null) {
log.error("Error when publishing messages to the topic. Topic :"+ recordMetadata.topic(),ex);
}
}
}
生产者属性
acks=1
linger.ms=10
batch.size=51200
bootstrap.servers=aukk1.xx.com\:9092,aukk2.xx.com\:9092,aukk3.xx.com\:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.xx.KafkaPayloadSerializer
[1]
04:51:20,025 ERROR [org.apache.kafka.clients.producer.internals.RecordBatch] (kafka-producer-network-thread | producer-673) Error executing user-provided callback on message for topic-partition RAW_XML1harveyzhu-1:: java.lang.NullPointerException
at com.lxx.kafkamodels.KafkaPayload$ProducerCallback.onCompletion(KafkaPayload.java:204)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:205)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
at java.lang.Thread.run(Thread.java:745)
如果 ex 不为空,则第一个参数 'RecordMetadata' 为空,这就是为什么您在调用 recordMetadata.topic()
时看到 NPE
使用下面的代码生成记录元数据:
if (ex == null) {
logger.info("Successfully received the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
}
else {
logger.error("Can't produce,getting error",e);
我正在使用异步生产者向代理发送消息。如果我的回调中没有任何错误,我会看到生成消息大约需要 0.3 秒。但是当我低于错误 [1] 时,我发现生成消息需要 60 秒。 但我没有看到任何消息丢失。所有消息都在代理中可用。 是什么导致了这个错误?我在每生成 50 条消息时都会看到这种延迟。 出现此错误时如何提高生产者的性能?
代码;
producer.send(new ProducerRecord(topic, this), new ProducerCallback ());
private class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception ex) {
if (ex != null) {
log.error("Error when publishing messages to the topic. Topic :"+ recordMetadata.topic(),ex);
}
}
}
生产者属性
acks=1
linger.ms=10
batch.size=51200
bootstrap.servers=aukk1.xx.com\:9092,aukk2.xx.com\:9092,aukk3.xx.com\:9092
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.xx.KafkaPayloadSerializer
[1]
04:51:20,025 ERROR [org.apache.kafka.clients.producer.internals.RecordBatch] (kafka-producer-network-thread | producer-673) Error executing user-provided callback on message for topic-partition RAW_XML1harveyzhu-1:: java.lang.NullPointerException
at com.lxx.kafkamodels.KafkaPayload$ProducerCallback.onCompletion(KafkaPayload.java:204)
at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:155)
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:205)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:147)
at java.lang.Thread.run(Thread.java:745)
如果 ex 不为空,则第一个参数 'RecordMetadata' 为空,这就是为什么您在调用 recordMetadata.topic()
时看到 NPE使用下面的代码生成记录元数据:
if (ex == null) {
logger.info("Successfully received the details as: \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition:" + recordMetadata.partition() + "\n" +
"Offset" + recordMetadata.offset() + "\n" +
"Timestamp" + recordMetadata.timestamp());
}
else {
logger.error("Can't produce,getting error",e);