Kafka 2.9.2-0.8.1.1 producer.send 没有 KeyedMessage 参数

Kafka 2.9.2-0.8.1.1 no KeyedMessage parameter for producer.send

我正在尝试向 java 中的 Kafka 发送消息。我的项目正在通过 Maven 依赖项使用 kafka_2.9.2-0.8.1.1.jar:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.9.2</artifactId>
  <version>0.8.1.1</version>
  <exclusions>
    <exclusion>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
    </exclusion>
    <exclusion>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
    </exclusion>
  </exclusions>
</dependency>

我遇到的问题是我的制作人。我想发送一个 KeyedMessage 并通过此处的文档:http://kafka.apache.org/documentation.html#producerapi 它指出 producer.send 方法应该将 KeyedMessage 作为其参数。当我检查 producer.send 调用的可用选项时,它只允许 ProducerData 对象作为可接受的参数。

我的代码是这样设置的:

    private String topic;   
    private String key;
    private Properties props = new Properties();
    private Producer<String,String> producer;

    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("zk.connect", "127.0.0.1:2181");  

    //create the producer
    producer = new Producer<String, String>(new ProducerConfig(props));   

    String topic = "test";
    String key = "test_key";
    String message = "test_msg";

    KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, message);
    producer.send(data)  <---- ERROR is here, send method not allowed param of KeyedMessage

实际错误是这样表述的:

 The method send(ProducerData<String,String>) in the type Producer<String,String> is not applicable for the arguments (KeyedMessage<String,String>)

我认为你需要定义

 props.put("partitioner.class", "example.producer.SimplePartitioner");

wiki 页面显示

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

如果您不添加 "partitioner.class" 属性 生产者将遵循该特定主题的分区之间的循环分配。

 props.put("partitioner.class", "example.producer.SimplePartitioner");

如果您不特别需要基于键的分区选择,请省略 "partitioner.class" 属性 并按如下方式定义 KeyedMessage,

    private String topic;   
    private String key;
    private Properties props = new Properties();
    private Producer<String,String> producer;

    props.put("metadata.broker.list", "localhost:9092");
    props.put("producers.type", "sync");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request_final.required.acks", "1");

    //create the producer
    producer = new Producer<String, String>(new ProducerConfig(props));   

    topic = "test";
    message = "test_msg";

    KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, message);
    producer.send(data) 

使用

import kafka.javaapi.producer.Producer; 

你可能用过

import kafka.producer.Producer;