我的生产者可以创建一个主题,但数据似乎没有存储在代理中

My producer can create a topic, but data doesn't seem to be stored inside the broker

我的生产者可以创建一个主题,但它似乎没有在代理中存储任何数据。我可以检查主题是否是使用 kafka-topics 脚本创建的。

当我尝试使用 kafka-console-consumer 消费时,它没有消费任何东西。 (我知道--from-beginning。)

当我使用 kafka-console-producer 进行生产时,我的消费者 (kafka-console-consumer) 可以立即使用它。所以我的 java 代码有问题。

当我 运行 我的代码使用 localhost:9092 时,它运行良好。当我使用我的消费者代码使用该主题时,它工作正常。我的制作人在我的本地机器上使用 Kafka 服务器,但不使用远程机器上的另一个 Kafka 服务器。

代码:

//this code is inside the main method
Properties properties = new Properties();
        //properties.put("bootstrap.servers", "localhost:9092");  
        //When I used localhost, my consumer code consumes it fine. 
        properties.put("bootstrap.servers", "192.168.0.30:9092");        
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
        //topc is created, but consumer can't consume any data. 
        //I tried putting different values for key and value parameters but no avail.

        try {
            kafkaProducer.send(record);
            System.out.println("complete");
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.close();
            System.out.println("closed");
        }
           
        /*//try{
        for(int i = 0; i < 10000; i++){
            System.out.println(i);
            kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), "message - " + i ));
        }*/

我的 CLI(腻子):

我想在我 运行 我的 java 代码时看到我的消费者消费。(图中显示的数据来自生产者脚本。 )


更新

阅读答案和评论后,这就是我迄今为止尝试过的方法。仍然没有消费任何消息。我认为此代码中生成的消息未存储在代理中。我也尝试使用不同的服务器。同样的问题。 Topic创建了,但是consumer group列表中没有consumer,不能消费。消费者脚本无法消费任何数据。

我也尝试过更改权限。 (chown)并尝试使用 etc/hosts 个文件。但没有运气。我会继续努力,直到我解决这个问题。

public static void main(String[] args){
        Properties properties = new Properties();
        //properties.put("bootstrap.servers", "localhost:9092");
        properties.put("bootstrap.servers", "192.168.0.30:9092");        
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("linger.ms", "1");
        properties.put("batch.size", "16384");
        properties.put("request.timeout.ms", "30000");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        
        ProducerRecord<String, String> record = new ProducerRecord<>("test5", "1111","jin1111");
        System.out.println("1");
        try {
            kafkaProducer.send(record);
            //kafkaProducer.send(record).get();
            // implement Callback 
            System.out.println("complete");
            kafkaProducer.flush();
            System.out.println("flush completed");
            
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.flush();
            System.out.println("another flush test");
            kafkaProducer.close();
            System.out.println("closed");
        }       
}   

当我在 Eclipse 中 运行 时,控制台显示:

我的猜测是您的 main 方法退出并且应用程序在 Kafka 客户端发送消息之前结束。 send 方法不同步。客户端缓冲消息并在达到名为逗留时间的超时(参见 linger.ms)或缓冲区填充到特定大小(例如参见 [​​=15=] 参数)后发送它们。默认的延迟时间无论如何都是 0。 因此,您的 main 方法所做的是向 send 方法提供消息,但随后它退出并且 Kafka 客户端中的底层线程无法发送消息。

要完成 ppatierno 答案,您应该在调用 KafkaProducer.close() 之前调用 KafkaProducer.flush()。这是一个阻塞调用,在发送所有记录之前不会return。

雅尼克

我终于明白了。如果您遇到类似的问题,您可以采取一些措施。

在你的 server.properties 中,取消注释这些并输入 ip 和端口。 (好像是端口有问题,所以改了)

listeners=PLAINTEXT://192.168.0.30:9093
advertised.listeners=PLAINTEXT://192.168.0.30:9093

(在使用更改后的 server.properties 重新启动代理之前,您可能想要清除所有现有的 log.dir。如果没有任何效果,请尝试此操作)

您可能需要考虑的其他一些事项:

  • 改变你的 log.dir。通常默认路径是tmp,但有时会有noexec设置,所以配置到不同的位置
  • 检查你的etc/hosts
  • 检查您的许可:并使用 chownchmod
  • 如有必要,更改 zookeeper 端口和 kafka 端口。
  • 改变broker.id

我的工作 生产者 代码:

public class Producer1 {

    public static void main(String[] args){
        Properties properties = new Properties();            
        properties.put("bootstrap.servers", "192.168.0.30:9093");        
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        ProducerRecord<String, String> record = new ProducerRecord<>("test", "1","jin");

        try {           
            kafkaProducer.send(record);
            System.out.println("complete");                     
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            kafkaProducer.close();
            System.out.println("closed");
        }              
    }   
}


工作消费者代码:

public class Consumer1 {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.0.30:9093");
        props.put("group.id", "jin");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Collections.singletonList("test"));

        try {
            while (true) {              

                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records){
                    System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());          
                }                           
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            consumer.close();
            System.out.println("closed");
        }   
    }
}

控制台: