java Kafka Producer 的客户端示例,发送方法不接受 KeyedMessage
java client example for Kafka Producer, send method not accepting KeyedMessage
我是运行ning kafka 2.9.1-0.8.2.1。我将 libs/ 目录中提供的 jars 包含在主 kafka 目录中。现在,我正在尝试 运行 一个 java 生产者示例,按照这里 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example 给出的内容。现在 producer.send
方法似乎正在接受这种说法 Seq<KeyedMessage<String, String>>
。在这个例子中,KeyedMessage 的对象没有被转换成任何东西。当我尝试做同样的事情时,我得到了不兼容类型的编译器错误。
这是代码
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.producer.Producer;
import scala.collection.Seq;
public class KakfaProducer {
public static void main(String [] args) {
Properties prop = new Properties();
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class","kafka.serializer.StringEncoder");
//prop.put("partitioner.class", "example.producer.SimplePartitioner");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String,String> producer = new <String,String>Producer(producerConfig);
String topic = "test";
KeyedMessage<String,String> message = new <String,String>KeyedMessage(topic, "Hello Test message");
producer.send(message);
producer.close();
}
}
那个注释代码给我 class def not found 异常。我试着在网上看了很多,但没有帮助。
libs/ 目录中有两种 jar。一个是 kafka-client,另一个只是 kafka 和版本号。我包括错误的罐子吗?我需要和哪一个一起工作?
对于第一个问题,不是导入 scala API,而是导入 Java 一个。
所以,而不是使用:
import kafka.producer.Producer;
请使用:
import kafka.javaapi.producer.Producer;
SimplePartitioner 代码可以在下面找到。添加到对应目录:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
}
return partition;
}
}
运行 Java 中的生产者有 2 种方法。
1) 使用核心卡夫卡。这是你的方法。
2) 使用卡夫卡客户端。
Kafka 0.8.2 Documentation中提到了它们之间的区别。
These new clients are meant to supplant the existing Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server.
说明新客户端更小,可能会取代原来的方法。
在第 3.4 节中。新的 Producer provider 一些不同的配置。
We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality.
我是运行ning kafka 2.9.1-0.8.2.1。我将 libs/ 目录中提供的 jars 包含在主 kafka 目录中。现在,我正在尝试 运行 一个 java 生产者示例,按照这里 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example 给出的内容。现在 producer.send
方法似乎正在接受这种说法 Seq<KeyedMessage<String, String>>
。在这个例子中,KeyedMessage 的对象没有被转换成任何东西。当我尝试做同样的事情时,我得到了不兼容类型的编译器错误。
这是代码
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.producer.Producer;
import scala.collection.Seq;
public class KakfaProducer {
public static void main(String [] args) {
Properties prop = new Properties();
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class","kafka.serializer.StringEncoder");
//prop.put("partitioner.class", "example.producer.SimplePartitioner");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String,String> producer = new <String,String>Producer(producerConfig);
String topic = "test";
KeyedMessage<String,String> message = new <String,String>KeyedMessage(topic, "Hello Test message");
producer.send(message);
producer.close();
}
}
那个注释代码给我 class def not found 异常。我试着在网上看了很多,但没有帮助。
libs/ 目录中有两种 jar。一个是 kafka-client,另一个只是 kafka 和版本号。我包括错误的罐子吗?我需要和哪一个一起工作?
对于第一个问题,不是导入 scala API,而是导入 Java 一个。 所以,而不是使用:
import kafka.producer.Producer;
请使用:
import kafka.javaapi.producer.Producer;
SimplePartitioner 代码可以在下面找到。添加到对应目录:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % numPartitions;
}
return partition;
}
}
运行 Java 中的生产者有 2 种方法。
1) 使用核心卡夫卡。这是你的方法。 2) 使用卡夫卡客户端。
Kafka 0.8.2 Documentation中提到了它们之间的区别。
These new clients are meant to supplant the existing Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server.
说明新客户端更小,可能会取代原来的方法。
在第 3.4 节中。新的 Producer provider 一些不同的配置。
We are working on a replacement for our existing producer. The code is available in trunk now and can be considered beta quality.