KafkaProducer 未成功将消息发送到队列中
KafkaProducer not successfully sending message into the queue
我在我的 Windows PC 上构建了一个小型测试环境,并写下了以下用于测试 kafka 的代码(使用 kafka_2.10:0.9.0.1 来自 org.apache.kafka)。
package iii.functiontesting;
import java.text.ParseException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Hello world!
*
*/
public class test4
{
public static void main( String[] args ) throws ParseException
{
Properties producerProps=new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("request.required.acks","1");
KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps);
ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1");
kafkawriter.send(msg);
}
}
我使用如下命令检查消息是否正确写入队列
D:\Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\kafka-console-consumer.bat
--zookeeper localhost:2181 --topic TEST3 --from-beginning
但是我发现kafka-console-consumer什么也没有显示
我一直怀疑我的kafka服务器没有正常运行,所以我用console-producer来测试
D:\Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\kafka-console-producer.bat
--broker-list localhost:9092 --topic TEST3
aaaaa
这次在console-consumer下可以清楚的看到aaaaa了。
我不知道会发生什么。
谁能帮帮我?
您必须在终止程序之前调用 KafkaProducer#flush
[或] KafkaProducer#close
方法。
实际上,生产者在将记录发送给代理之前会对其进行缓冲。请参阅 Kafka Producer configuration
中的 buffer.memory
和 batch.size
kafkawriter.send(msg);
kafkawriter.close();
我在我的 Windows PC 上构建了一个小型测试环境,并写下了以下用于测试 kafka 的代码(使用 kafka_2.10:0.9.0.1 来自 org.apache.kafka)。
package iii.functiontesting;
import java.text.ParseException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Hello world!
*
*/
public class test4
{
public static void main( String[] args ) throws ParseException
{
Properties producerProps=new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("request.required.acks","1");
KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps);
ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1");
kafkawriter.send(msg);
}
}
我使用如下命令检查消息是否正确写入队列
D:\Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\kafka-console-consumer.bat --zookeeper localhost:2181 --topic TEST3 --from-beginning
但是我发现kafka-console-consumer什么也没有显示
我一直怀疑我的kafka服务器没有正常运行,所以我用console-producer来测试
D:\Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\kafka-console-producer.bat --broker-list localhost:9092 --topic TEST3
aaaaa
这次在console-consumer下可以清楚的看到aaaaa了。 我不知道会发生什么。 谁能帮帮我?
您必须在终止程序之前调用 KafkaProducer#flush
[或] KafkaProducer#close
方法。
实际上,生产者在将记录发送给代理之前会对其进行缓冲。请参阅 Kafka Producer configuration
中的buffer.memory
和 batch.size
kafkawriter.send(msg);
kafkawriter.close();