在 Kafka 上获取 Storm runno
Get Storm runningo on Kafka
我有一个问题,正在执行一个连接 storm 和 kafka 的示例。成功拿到kafka运行。下一步就是连接storm了。
这是示例代码。
/**
* @author Amit Kumar
*/
public class Topology {
private static final Logger LOG = Logger.getLogger(Topology.class);
public static void main(String[] args) {
// Build Spout configuration using input command line parameters
final BrokerHosts zkrHosts = new ZkHosts("gitserver:2181");
final String kafkaTopic = "test";
final String zkRoot = "";
final String clientId = "client1";
final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
// Build topology to consume message from kafka and print them on console
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// Create KafkaSpout instance using Kafka configuration and add it to topology
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
//Route the output of Kafka Spout to Logger bolt to log messages consumed from Kafka
topologyBuilder.setBolt("print-messages", new LoggerBolt()).globalGrouping("kafka-spout");
// Submit topology to local cluster i.e. embedded storm instance in eclipse
final LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafka-topology", new HashMap(), topologyBuilder.createTopology());
}
}
我收到一个我不明白的错误。谁能帮忙?
我认为,这与 zkRoot 的价值有关。我尝试了不同的路径(例如 zookeeper 安装或数据,但没有任何效果。
21790 [Thread-18-kafka-spout-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1], Task-ID: 2 New partition managers: [Partition{host=gitserver:9092, topic=test, partition=0}]
21839 [Thread-18-kafka-spout-executor[2 2]] INFO o.a.s.k.PartitionManager - Read partition information from: /client1/partition_0 --> null
21877 [Thread-18-kafka-spout-executor[2 2]] INFO k.c.SimpleConsumer - Reconnect due to socket error: java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend: method <init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V not found
21879 [Thread-18-kafka-spout-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend: method <init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V not found
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1(SimpleConsumer.scala:98) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.9.0.0.jar:?]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:81) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:71) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:135) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:110) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:71) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$fn__10727$fn__10742$fn__10773.invoke(executor.clj:654) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
21884 [Thread-18-kafka-spout-executor[2 2]] ERROR o.a.s.d.executor -
请使用 storm-kafka-client
而不是新代码的 storm-kafka
,后者已被弃用。
您看到的错误可能是由于在您的 class 路径中混用了不同的 Kafka 版本。我查了 class 你从中得到错误,并且错误提到的构造函数存在于 Kafka 0.9 (https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java#L26) but not 0.8 (https://github.com/apache/kafka/blob/0.8.2.2/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java#L26)
您很可能不小心将 Kafka 0.8 classes 混入您的构建中。我会使用 mvn dependency:tree
检查您的拓扑结构,并确保所有 Kafka 罐子都来自同一版本。
我有一个问题,正在执行一个连接 storm 和 kafka 的示例。成功拿到kafka运行。下一步就是连接storm了。
这是示例代码。
/**
* @author Amit Kumar
*/
public class Topology {
private static final Logger LOG = Logger.getLogger(Topology.class);
public static void main(String[] args) {
// Build Spout configuration using input command line parameters
final BrokerHosts zkrHosts = new ZkHosts("gitserver:2181");
final String kafkaTopic = "test";
final String zkRoot = "";
final String clientId = "client1";
final SpoutConfig kafkaConf = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, clientId);
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
// Build topology to consume message from kafka and print them on console
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// Create KafkaSpout instance using Kafka configuration and add it to topology
topologyBuilder.setSpout("kafka-spout", new KafkaSpout(kafkaConf), 1);
//Route the output of Kafka Spout to Logger bolt to log messages consumed from Kafka
topologyBuilder.setBolt("print-messages", new LoggerBolt()).globalGrouping("kafka-spout");
// Submit topology to local cluster i.e. embedded storm instance in eclipse
final LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafka-topology", new HashMap(), topologyBuilder.createTopology());
}
}
我收到一个我不明白的错误。谁能帮忙? 我认为,这与 zkRoot 的价值有关。我尝试了不同的路径(例如 zookeeper 安装或数据,但没有任何效果。
21790 [Thread-18-kafka-spout-executor[2 2]] INFO o.a.s.k.ZkCoordinator - Task [1/1], Task-ID: 2 New partition managers: [Partition{host=gitserver:9092, topic=test, partition=0}]
21839 [Thread-18-kafka-spout-executor[2 2]] INFO o.a.s.k.PartitionManager - Read partition information from: /client1/partition_0 --> null
21877 [Thread-18-kafka-spout-executor[2 2]] INFO k.c.SimpleConsumer - Reconnect due to socket error: java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend: method <init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V not found
21879 [Thread-18-kafka-spout-executor[2 2]] ERROR o.a.s.util - Async loop died!
java.lang.NoSuchMethodError: org.apache.kafka.common.network.NetworkSend: method <init>(Ljava/lang/String;[Ljava/nio/ByteBuffer;)V not found
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:41) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.network.RequestOrResponseSend.<init>(RequestOrResponseSend.scala:44) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.network.BlockingChannel.send(BlockingChannel.scala:112) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.liftedTree1(SimpleConsumer.scala:98) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:149) ~[kafka_2.11-0.9.0.0.jar:?]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[kafka_2.11-0.9.0.0.jar:?]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:81) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:71) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.PartitionManager.<init>(PartitionManager.java:135) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:110) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:71) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-kafka-1.2.2.jar:1.2.2]
at org.apache.storm.daemon.executor$fn__10727$fn__10742$fn__10773.invoke(executor.clj:654) ~[storm-core-1.2.2.jar:1.2.2]
at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) [storm-core-1.2.2.jar:1.2.2]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_73]
21884 [Thread-18-kafka-spout-executor[2 2]] ERROR o.a.s.d.executor -
请使用 storm-kafka-client
而不是新代码的 storm-kafka
,后者已被弃用。
您看到的错误可能是由于在您的 class 路径中混用了不同的 Kafka 版本。我查了 class 你从中得到错误,并且错误提到的构造函数存在于 Kafka 0.9 (https://github.com/apache/kafka/blob/0.9.0.0/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java#L26) but not 0.8 (https://github.com/apache/kafka/blob/0.8.2.2/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java#L26)
您很可能不小心将 Kafka 0.8 classes 混入您的构建中。我会使用 mvn dependency:tree
检查您的拓扑结构,并确保所有 Kafka 罐子都来自同一版本。