为什么关闭 Kafka Producer (producer.close()) 会阻塞并需要几分钟
Why might closing a Kafka Producer (producer.close()) block and takes a few minutes
朋友们,下面的代码运行没有问题。然而,尽管生成 1-million
消息需要不到一秒,但生产者关闭 (producer.close()
) 需要大约两 (qty. 2) 分钟才能完成(它阻塞)。我 运行 3-Broker Confluent Kafka cluster
在结实的 Fedora
PC 上,硬件性能似乎不是问题。
我想知道是否有 Kafka 配置导致此问题。我有以下简单的 topic-partition
模式:
Topic: myTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
和min.insync.replicas=1
见下文。有任何想法吗?提前致谢!
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class MyProducer {
public static final Logger logger = LogManager.getLogger();
public static void main (String[] args) {
logger.info("Creating Kafka Producer...");
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
// The following loop takes less than a second to complete.
logger.info("Start ...");
for(int i = 0; i < AppConfigs.numEvents; i++) {
producer.send(new ProducerRecord<>(AppConfigs.topicName, i, "msg" + i));
}
logger.info("End. Closing producer.");
producer.close(); // However this blocks.
System.out.println("I am here!"); // Getting here takes about ~2 minutes.
}
}
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.
The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
这意味着在缓冲区满之前,您将毫无延迟地(异步)从应用程序发送消息。但在关闭时,您将等待所有消息成功传送到服务器。
并且 min.insync.replicas=X
允许 acks=all
请求在分区的至少 x 个副本同步时继续工作。在这里,我们看到了一个有两个副本的例子。
你可以设置ask = 0,这将使你达到最大吞吐量,因为只有网络容量会有限制,但如果出现故障,消息可能会丢失。你总是要做出妥协)
朋友们,下面的代码运行没有问题。然而,尽管生成 1-million
消息需要不到一秒,但生产者关闭 (producer.close()
) 需要大约两 (qty. 2) 分钟才能完成(它阻塞)。我 运行 3-Broker Confluent Kafka cluster
在结实的 Fedora
PC 上,硬件性能似乎不是问题。
我想知道是否有 Kafka 配置导致此问题。我有以下简单的 topic-partition
模式:
Topic: myTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
和min.insync.replicas=1
见下文。有任何想法吗?提前致谢!
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
public class MyProducer {
public static final Logger logger = LogManager.getLogger();
public static void main (String[] args) {
logger.info("Creating Kafka Producer...");
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, AppConfigs.applicationID);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfigs.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(props);
// The following loop takes less than a second to complete.
logger.info("Start ...");
for(int i = 0; i < AppConfigs.numEvents; i++) {
producer.send(new ProducerRecord<>(AppConfigs.topicName, i, "msg" + i));
}
logger.info("End. Closing producer.");
producer.close(); // However this blocks.
System.out.println("I am here!"); // Getting here takes about ~2 minutes.
}
}
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources.
The send() method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency.
这意味着在缓冲区满之前,您将毫无延迟地(异步)从应用程序发送消息。但在关闭时,您将等待所有消息成功传送到服务器。
并且 min.insync.replicas=X
允许 acks=all
请求在分区的至少 x 个副本同步时继续工作。在这里,我们看到了一个有两个副本的例子。
你可以设置ask = 0,这将使你达到最大吞吐量,因为只有网络容量会有限制,但如果出现故障,消息可能会丢失。你总是要做出妥协)