我的生产者可以创建一个主题,但数据似乎没有存储在代理中
My producer can create a topic, but data doesn't seem to be stored inside the broker
我的生产者可以创建一个主题,但它似乎没有在代理中存储任何数据。我可以检查主题是否是使用 kafka-topics
脚本创建的。
当我尝试使用 kafka-console-consumer
消费时,它没有消费任何东西。 (我知道--from-beginning
。)
当我使用 kafka-console-producer
进行生产时,我的消费者 (kafka-console-consumer
) 可以立即使用它。所以我的 java 代码有问题。
当我 运行 我的代码使用 localhost:9092
时,它运行良好。当我使用我的消费者代码使用该主题时,它工作正常。我的制作人在我的本地机器上使用 Kafka 服务器,但不使用远程机器上的另一个 Kafka 服务器。
代码:
//this code is inside the main method
Properties properties = new Properties();
//properties.put("bootstrap.servers", "localhost:9092");
//When I used localhost, my consumer code consumes it fine.
properties.put("bootstrap.servers", "192.168.0.30:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
//topc is created, but consumer can't consume any data.
//I tried putting different values for key and value parameters but no avail.
try {
kafkaProducer.send(record);
System.out.println("complete");
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
System.out.println("closed");
}
/*//try{
for(int i = 0; i < 10000; i++){
System.out.println(i);
kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), "message - " + i ));
}*/
我的 CLI(腻子):
我想在我 运行 我的 java 代码时看到我的消费者消费。(图中显示的数据来自生产者脚本。 )
更新
阅读答案和评论后,这就是我迄今为止尝试过的方法。仍然没有消费任何消息。我认为此代码中生成的消息未存储在代理中。我也尝试使用不同的服务器。同样的问题。 Topic创建了,但是consumer group列表中没有consumer,不能消费。消费者脚本无法消费任何数据。
我也尝试过更改权限。 (chown)并尝试使用 etc/hosts 个文件。但没有运气。我会继续努力,直到我解决这个问题。
public static void main(String[] args){
Properties properties = new Properties();
//properties.put("bootstrap.servers", "localhost:9092");
properties.put("bootstrap.servers", "192.168.0.30:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("linger.ms", "1");
properties.put("batch.size", "16384");
properties.put("request.timeout.ms", "30000");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
System.out.println("1");
try {
kafkaProducer.send(record);
//kafkaProducer.send(record).get();
// implement Callback
System.out.println("complete");
kafkaProducer.flush();
System.out.println("flush completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.flush();
System.out.println("another flush test");
kafkaProducer.close();
System.out.println("closed");
}
}
当我在 Eclipse 中 运行 时,控制台显示:
我的猜测是您的 main 方法退出并且应用程序在 Kafka 客户端发送消息之前结束。
send
方法不同步。客户端缓冲消息并在达到名为逗留时间的超时(参见 linger.ms)或缓冲区填充到特定大小(例如参见 [=15=] 参数)后发送它们。默认的延迟时间无论如何都是 0。
因此,您的 main 方法所做的是向 send
方法提供消息,但随后它退出并且 Kafka 客户端中的底层线程无法发送消息。
要完成 ppatierno 答案,您应该在调用 KafkaProducer.close() 之前调用 KafkaProducer.flush()。这是一个阻塞调用,在发送所有记录之前不会return。
雅尼克
我终于明白了。如果您遇到类似的问题,您可以采取一些措施。
在你的 server.properties
中,取消注释这些并输入 ip 和端口。
(好像是端口有问题,所以改了)
listeners=PLAINTEXT://192.168.0.30:9093
advertised.listeners=PLAINTEXT://192.168.0.30:9093
(在使用更改后的 server.properties 重新启动代理之前,您可能想要清除所有现有的 log.dir
。如果没有任何效果,请尝试此操作)
您可能需要考虑的其他一些事项:
- 改变你的
log.dir
。通常默认路径是tmp
,但有时会有noexec
设置,所以配置到不同的位置
- 检查你的
etc/hosts
- 检查您的许可:并使用
chown
和 chmod
- 如有必要,更改 zookeeper 端口和 kafka 端口。
- 改变broker.id
我的工作 生产者 代码:
public class Producer1 {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.0.30:9093");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test", "1","jin");
try {
kafkaProducer.send(record);
System.out.println("complete");
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
System.out.println("closed");
}
}
}
工作消费者代码:
public class Consumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.30:9093");
props.put("group.id", "jin");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList("test"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
System.out.println("closed");
}
}
}
控制台:
我的生产者可以创建一个主题,但它似乎没有在代理中存储任何数据。我可以检查主题是否是使用 kafka-topics
脚本创建的。
当我尝试使用 kafka-console-consumer
消费时,它没有消费任何东西。 (我知道--from-beginning
。)
当我使用 kafka-console-producer
进行生产时,我的消费者 (kafka-console-consumer
) 可以立即使用它。所以我的 java 代码有问题。
当我 运行 我的代码使用 localhost:9092
时,它运行良好。当我使用我的消费者代码使用该主题时,它工作正常。我的制作人在我的本地机器上使用 Kafka 服务器,但不使用远程机器上的另一个 Kafka 服务器。
代码:
//this code is inside the main method
Properties properties = new Properties();
//properties.put("bootstrap.servers", "localhost:9092");
//When I used localhost, my consumer code consumes it fine.
properties.put("bootstrap.servers", "192.168.0.30:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
//topc is created, but consumer can't consume any data.
//I tried putting different values for key and value parameters but no avail.
try {
kafkaProducer.send(record);
System.out.println("complete");
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
System.out.println("closed");
}
/*//try{
for(int i = 0; i < 10000; i++){
System.out.println(i);
kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), "message - " + i ));
}*/
我的 CLI(腻子):
我想在我 运行 我的 java 代码时看到我的消费者消费。(图中显示的数据来自生产者脚本。 )
更新
阅读答案和评论后,这就是我迄今为止尝试过的方法。仍然没有消费任何消息。我认为此代码中生成的消息未存储在代理中。我也尝试使用不同的服务器。同样的问题。 Topic创建了,但是consumer group列表中没有consumer,不能消费。消费者脚本无法消费任何数据。
我也尝试过更改权限。 (chown)并尝试使用 etc/hosts 个文件。但没有运气。我会继续努力,直到我解决这个问题。
public static void main(String[] args){
Properties properties = new Properties();
//properties.put("bootstrap.servers", "localhost:9092");
properties.put("bootstrap.servers", "192.168.0.30:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("linger.ms", "1");
properties.put("batch.size", "16384");
properties.put("request.timeout.ms", "30000");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
System.out.println("1");
try {
kafkaProducer.send(record);
//kafkaProducer.send(record).get();
// implement Callback
System.out.println("complete");
kafkaProducer.flush();
System.out.println("flush completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.flush();
System.out.println("another flush test");
kafkaProducer.close();
System.out.println("closed");
}
}
当我在 Eclipse 中 运行 时,控制台显示:
我的猜测是您的 main 方法退出并且应用程序在 Kafka 客户端发送消息之前结束。
send
方法不同步。客户端缓冲消息并在达到名为逗留时间的超时(参见 linger.ms)或缓冲区填充到特定大小(例如参见 [=15=] 参数)后发送它们。默认的延迟时间无论如何都是 0。
因此,您的 main 方法所做的是向 send
方法提供消息,但随后它退出并且 Kafka 客户端中的底层线程无法发送消息。
要完成 ppatierno 答案,您应该在调用 KafkaProducer.close() 之前调用 KafkaProducer.flush()。这是一个阻塞调用,在发送所有记录之前不会return。
雅尼克
我终于明白了。如果您遇到类似的问题,您可以采取一些措施。
在你的 server.properties
中,取消注释这些并输入 ip 和端口。
(好像是端口有问题,所以改了)
listeners=PLAINTEXT://192.168.0.30:9093
advertised.listeners=PLAINTEXT://192.168.0.30:9093
(在使用更改后的 server.properties 重新启动代理之前,您可能想要清除所有现有的 log.dir
。如果没有任何效果,请尝试此操作)
您可能需要考虑的其他一些事项:
- 改变你的
log.dir
。通常默认路径是tmp
,但有时会有noexec
设置,所以配置到不同的位置 - 检查你的
etc/hosts
- 检查您的许可:并使用
chown
和chmod
- 如有必要,更改 zookeeper 端口和 kafka 端口。
- 改变broker.id
我的工作 生产者 代码:
public class Producer1 {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.0.30:9093");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("test", "1","jin");
try {
kafkaProducer.send(record);
System.out.println("complete");
} catch (Exception e) {
e.printStackTrace();
} finally {
kafkaProducer.close();
System.out.println("closed");
}
}
}
工作消费者代码:
public class Consumer1 {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.0.30:9093");
props.put("group.id", "jin");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList("test"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
} catch (Exception e){
e.printStackTrace();
} finally {
consumer.close();
System.out.println("closed");
}
}
}
控制台: