kafka.common.FailedToSendMessageException: kafka 产生错误

kafka.common.FailedToSendMessageException: kafka produce error

我正在读取一个 json 文件并尝试使用 kafka 生成它.. 这是我的代码:

public class FlatFileDataProducer {

    private String topic = "JsonTopic";
    private Producer<String, String> producer = null;
    KeyedMessage<String, String> message = null;
    public JsonReader reader;

    public void run(String jsonPath) throws ClassNotFoundException, FileNotFoundException, IOException, ParseException{
        reader = new JsonReader();
        System.out.println("---------------------");
        System.out.println("JSON FILE PATH IS : "+jsonPath);
        System.out.println("---------------------");
        Properties prop = new Properties();
        prop.put("metadata.broker.list", "192.168.63.145:9092");
        prop.put("serializer.class", "kafka.serializer.StringEncoder");
        // prop.put("partitioner.class", "example.producer.SimplePartitioner");
        prop.put("request.required.acks", "1");


        ProducerConfig config = new ProducerConfig(prop);
        producer = new Producer<String, String>(config);
        List<Employee> emp = reader.readJsonFile(jsonPath);     
        for (Employee employee : emp) 
        {
            System.out.println("---------------------");
            System.out.println(employee.toString());
            System.out.println("---------------------");
            message = new KeyedMessage<String, String>(topic, employee.toString());

            producer.send(message);
            producer.close();

        }
         System.out.println("Messages to Kafka successfully");
    }

读取json文件的代码是:

public List<Employee> readJsonFile(String path) throws FileNotFoundException, IOException, ParseException{
        Employee employee = new Employee();
        parser=new JSONParser();
        Object obj = parser.parse(new FileReader(path));
        JSONObject jsonObject = (JSONObject) obj;
        employee.setId(Integer.parseInt(jsonObject.get("id").toString()));      
        employee.setName((String)jsonObject.get("name"));
        employee.setSalary(Integer.parseInt(jsonObject.get("salary").toString()));
        list.add(employee);
        return list;
    }

但是当我执行程序时, 问题 1:

> [root@sandbox ~]# java -jar sparkkafka.jar /root/customer.json
> JSON FILE PATH IS : /root/customer.json
>  log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties). log4j:WARN Please
> initialize the log4j system properly.
> 1,Smith,25
> Exception in thread "main" kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
>         at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
>         at kafka.producer.Producer.send(Producer.scala:77)
>         at kafka.javaapi.producer.Producer.send(Producer.scala:33)
>         at com.up.jsonType.FlatFileDataProducer.run(FlatFileDataProducer.java:41)
>         at com.up.jsonType.FlatFileDataProducer.main(FlatFileDataProducer.java:49)

它给出了错误,但是当我检查消费者 shell 时,我得到如下信息:对于 JSON 文件中的一行,我在 shell 中看到 4 个条目。 问题 2:

[root@sandbox bin]# [root@sandbox bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic JsonTopic --from-beginning

1,Smith,25
1,Smith,25
1,Smith,25
1,Smith,25

我得到相同数据行的 4 倍。

你可以尝试在下面添加 属性:

prop.put("producer.type","async");

您需要删除以下两个属性:

    //prop.put("request.required.acks", "1");
    //prop.put("producer.type","async");

这个 属性 实际上会处理确认。