当主线程睡眠小于 1000 时无法生成消息
Cannot produce Message when Main Thread sleep less than 1000
当我使用Kafka的Java API时,如果我让我的主线程休眠少于2000ns,它不能产生任何结果message.I真的很想知道为什么会这样?
这是我的制作人:
public class Producer {
private final KafkaProducer<String, String> producer;
private final String topic;
public Producer(String topic, String[] args) {
//......
//......
producer = new KafkaProducer<>(props);
this.topic = topic;
}
public void producerMsg() throws InterruptedException {
String data = "Apache Storm is a free and open source distributed";
data = data.replaceAll("[\pP‘’“”]", "");
String[] words = data.split(" ");
Random _rand = new Random();
Random rnd = new Random();
int events = 10;
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
int lastIPnum = rnd.nextInt(255);
String ip = "192.168.2." + lastIPnum;
String msg = words[_rand.nextInt(words.length)];
try {
producer.send(new ProducerRecord<>(topic, ip, msg));
System.out.println("Sent message: (" + ip + ", " + msg + ")");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Producer producer = new Producer(Constants.TOPIC, args);
producer.producerMsg();
//If I write Thread.sleep(1000),It will not work!!!!!!!!!!!!!!!!!!!!
Thread.sleep(2000);
}
}
感谢
你能展示一下你用来配置 Producer 的道具吗?我只是猜测有可能...
在 producerMsg() 中,您使用异步方式来使用生产者,所以只需 producer.send() 这意味着消息被放入一个内部缓冲区中,用于制作稍后将发送的批次。生产者有一个内部线程从缓冲区获取并发送批次。可能只有1000ms还不足以达到生产者真正发送消息的状态(参见batch.size和linger.ms),主应用程序结束,生产者没有发送消息就死了。给它更多时间(2000 毫秒),它会起作用。顺便说一句,我没有尝试代码。
所以原因似乎是你的:
props.put("linger.ms", 1000);
与您的睡眠相匹配。因此生产者将在 1000 毫秒后开始发送消息,因为批处理尚未满(batch.size 为 16 MB)。同时,主线程休眠 1 秒后结束,生产者不发送消息。你必须使用更小的 linger.ms 时间。
当我使用Kafka的Java API时,如果我让我的主线程休眠少于2000ns,它不能产生任何结果message.I真的很想知道为什么会这样?
这是我的制作人:
public class Producer {
private final KafkaProducer<String, String> producer;
private final String topic;
public Producer(String topic, String[] args) {
//......
//......
producer = new KafkaProducer<>(props);
this.topic = topic;
}
public void producerMsg() throws InterruptedException {
String data = "Apache Storm is a free and open source distributed";
data = data.replaceAll("[\pP‘’“”]", "");
String[] words = data.split(" ");
Random _rand = new Random();
Random rnd = new Random();
int events = 10;
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
int lastIPnum = rnd.nextInt(255);
String ip = "192.168.2." + lastIPnum;
String msg = words[_rand.nextInt(words.length)];
try {
producer.send(new ProducerRecord<>(topic, ip, msg));
System.out.println("Sent message: (" + ip + ", " + msg + ")");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Producer producer = new Producer(Constants.TOPIC, args);
producer.producerMsg();
//If I write Thread.sleep(1000),It will not work!!!!!!!!!!!!!!!!!!!!
Thread.sleep(2000);
}
}
感谢
你能展示一下你用来配置 Producer 的道具吗?我只是猜测有可能...
在 producerMsg() 中,您使用异步方式来使用生产者,所以只需 producer.send() 这意味着消息被放入一个内部缓冲区中,用于制作稍后将发送的批次。生产者有一个内部线程从缓冲区获取并发送批次。可能只有1000ms还不足以达到生产者真正发送消息的状态(参见batch.size和linger.ms),主应用程序结束,生产者没有发送消息就死了。给它更多时间(2000 毫秒),它会起作用。顺便说一句,我没有尝试代码。
所以原因似乎是你的:
props.put("linger.ms", 1000);
与您的睡眠相匹配。因此生产者将在 1000 毫秒后开始发送消息,因为批处理尚未满(batch.size 为 16 MB)。同时,主线程休眠 1 秒后结束,生产者不发送消息。你必须使用更小的 linger.ms 时间。