无法向 apache kafka 推送消息?
Unable to push messages to apache kafka?
我是 kafka
的新手,正在尝试 运行 示例 apache java 生产者代码以将数据推送到 kafka。我可以通过 java 创建新主题,但在推送时出现异常。这是代码:
package kafkaTest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class HelloKafkaProducer {
final static String TOPIC = "test_kafka1";
public static void main(String[] argv){
Properties properties = new Properties();
properties.put("metadata.broker.list", "172.25.37.66:9092");
ZkClient zkClient = new ZkClient("172.25.37.66:2181", 4000, 6000, new BytesPushThroughSerializer());
List<String> brokerList = zkClient.getChildren("/brokers/topics");
for(int i=0;i<brokerList.size();i++){
System.out.println(brokerList.get(i));
}
properties.put("zk.connect","172.25.37.66:2181");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
SimpleDateFormat sdf = new SimpleDateFormat();
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,"Test message from java program " + sdf.format(new Date()));
System.out.println(message);
producer.send(message);
/*Consumer consumerThread = new Consumer(TOPIC);
consumerThread.start();*/
}
}
这是堆栈跟踪:
topic1
test_kafka1
topic11
test
test_kafka
KeyedMessage(test_kafka1,null,null,Test message from java program 4/5/15 1:30 PM)
Exception in thread "main" [2015-05-04 13:30:41,432] ERROR Failed to send requests for topics test_kafka1 with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler:97)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at kafkaTest.HelloKafkaProducer.main(HelloKafkaProducer.java:54)
在控制台上,我每次 运行 程序时都会看到 [2015-05-04 18:55:29,959] INFO Closing socket connection to /172.17.70.73. (kafka.network.Processor)
。我可以使用控制台对主题进行推送和拉取。
我们将不胜感激。
谢谢
如果是 Kafka Producer,您不需要连接到 zookeeper。您必须连接到代理。为此,请使用以下 属性.
props.put("metadata.broker.list", "localhost:9092, broker1:9092");
在这里,我在你的情况下使用了 localhost,它将是 172.25.37.66
我是 kafka
的新手,正在尝试 运行 示例 apache java 生产者代码以将数据推送到 kafka。我可以通过 java 创建新主题,但在推送时出现异常。这是代码:
package kafkaTest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class HelloKafkaProducer {
final static String TOPIC = "test_kafka1";
public static void main(String[] argv){
Properties properties = new Properties();
properties.put("metadata.broker.list", "172.25.37.66:9092");
ZkClient zkClient = new ZkClient("172.25.37.66:2181", 4000, 6000, new BytesPushThroughSerializer());
List<String> brokerList = zkClient.getChildren("/brokers/topics");
for(int i=0;i<brokerList.size();i++){
System.out.println(brokerList.get(i));
}
properties.put("zk.connect","172.25.37.66:2181");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
SimpleDateFormat sdf = new SimpleDateFormat();
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,"Test message from java program " + sdf.format(new Date()));
System.out.println(message);
producer.send(message);
/*Consumer consumerThread = new Consumer(TOPIC);
consumerThread.start();*/
}
}
这是堆栈跟踪:
topic1
test_kafka1
topic11
test
test_kafka
KeyedMessage(test_kafka1,null,null,Test message from java program 4/5/15 1:30 PM)
Exception in thread "main" [2015-05-04 13:30:41,432] ERROR Failed to send requests for topics test_kafka1 with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler:97)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at kafkaTest.HelloKafkaProducer.main(HelloKafkaProducer.java:54)
在控制台上,我每次 运行 程序时都会看到 [2015-05-04 18:55:29,959] INFO Closing socket connection to /172.17.70.73. (kafka.network.Processor)
。我可以使用控制台对主题进行推送和拉取。
我们将不胜感激。 谢谢
如果是 Kafka Producer,您不需要连接到 zookeeper。您必须连接到代理。为此,请使用以下 属性.
props.put("metadata.broker.list", "localhost:9092, broker1:9092");
在这里,我在你的情况下使用了 localhost,它将是 172.25.37.66