如何让第一个 Storm Kafka 演示工作?
How to get the first Storm Kafka demo to work?
提交一个简单的第一个 Storm 拓扑让我抓狂。首先,当我尝试 运行 它针对安装了 storm 的远程 VM 时,我遇到了连接被拒绝的情况。我明白,我必须 运行 在本地集群上。
这些是我出来的代码行:
public void execTopology_alt() throws Exception {
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder(this.bootstrapServers, topic).build()), 1);
tp.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaboltTest", conf, tp.createTopology());
}
似乎有 2 个问题让我忙了几个小时。
1) 未找到 Class
20:48:40.202 [main] INFO org.apache.storm.daemon.metrics.ClientMetricsUtils - Using statistics reporter plugin:org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter
20:48:40.203 [main] INFO org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter - Preparing...
Exception in thread "main" java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
at org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter.prepare(JmxPreparableReporter.java:32)
at org.apache.storm.metric.StormMetricsRegistry.startMetricsReporters(StormMetricsRegistry.java:74)
at org.apache.storm.LocalCluster.<init>(LocalCluster.java:287)
at org.apache.storm.LocalCluster.<init>(LocalCluster.java:159)
at tki.bigdata.storm.StormTopology.execTopology_alt(StormTopology.java:93)
at tki.bigdata.storm.StormTopology.main(StormTopology.java:46)
Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 6 more
我 post 我的 pom.xml,我也在使用 Spring Boot 和 Spring Kafka(Kafka 工作)。也许这与 Storm 的东西一起工作不好?
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>tki.bigdata</groupId>
<artifactId>RealTime</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RealTime</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-server -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2) 当我 运行 它时,有大量的输出,永远重复
20:48:40.968 [main-SendThread(localhost:2004)] DEBUG org.apache.storm.shade.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000fc645fb0005, packet:: clientPath:null serverPath:null finished:false header:: 20,4 replyHeader:: 20,39,0 request:: '/storm/supervisors/6de1c57f-06e9-49ae-984f-09b5ad7d05e5,F response:: #1fffffff8b80000000354e41effffff8230105c1022ffffffd018ffffffe2ffffffc1ffffff8377ffffffaf6dffffffaaffffffa160f267cffffffc432d2531ffffffa6ffffffb66930115fffffffe0737cffffff94fffffffb1ffffff8d38ffffff87ffffff9dffffffddffffffd9ffffff9dffffffdd2d202ffffff8f737a7a21ffffff887dffffffb6ffffffd1ffffff9d14ffffffc65cffffffb175ffffff86cffffffee323effffff8930f4bffffffdfffffffdbffffffd5ffffffbdffffffda4bffffffd6cffffff98ffffffd6ffffff8affffffe3ffffff8a77afffffff363ffffffe54b2e58ffffffd7373d65ffffff8affffff95ffffff90146165969ffffffe0ffffffe0fffffff34860ffffffe668ffffffe6ffffffb8ffffff80ffffffe5ffffffac1ffffff82ffffffccffffffc7fffffff44028ffffffa12bffffffc85118effffffbfffffffacffffffa5ffffffbd132bffffff8d53ffffffc42a27ffffffd56d6cffffffa7ffffffbf772ffffffadffffffb47113ffffffd1ffffffa27dffffffffffffff94cfffffffcffffffc1ffffffed7ffffffbe7fffffffda34ffffffc2ffffffd4000,s{35,35,1560624520123,1560624520123,0,0,0,72074938289946636,186,0,35}
20:48:40.968 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x1000fc645fb0005 type:exists cxid:0x15 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
20:48:40.969 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - sessionid:0x1000fc645fb0005 type:exists cxid:0x15 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
20:48:40.969 [main-SendThread(localhost:2004)] DEBUG org.apache.storm.shade.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000fc645fb0005, packet:: clientPath:null serverPath:null finished:false header:: 21,3 replyHeader:: 21,39,0 request:: '/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34,F response:: s{39,39,1560624520167,1560624520167,0,0,0,72074938289946638,185,0,39}
20:48:40.969 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x1000fc645fb0005 type:getData cxid:0x16 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
20:48:40.969 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - sessionid:0x1000fc645fb0005 type:getData cxid:0x16 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
问题出在您的 POM 和 Kafka 设置上。
删除storm-kafka
,你不需要它。它是 Storm 的旧 Kafka 集成,storm-kafka-client
取代了它。
我会避免在 Storm 中使用 Spring。 Storm 不了解 Spring bean,因此您的 Spring 设置可能仅适用于本地集群,不适用于生产设置。
您需要将 storm-client
放在类路径中。将其设置为 provided
范围。您也可以删除 storm-core
。
您很可能需要在 spout Kafka 配置中设置一个消费者组。使用 KafkaSpoutConfig.setProp(ConsumerConfig.GROUP_ID_CONFIG, "your-group-here")
https://github.com/apache/storm/blob/v2.0.0/examples/storm-kafka-client-examples 上有一个完整的示例,我会以此为起点。
提交一个简单的第一个 Storm 拓扑让我抓狂。首先,当我尝试 运行 它针对安装了 storm 的远程 VM 时,我遇到了连接被拒绝的情况。我明白,我必须 运行 在本地集群上。
这些是我出来的代码行:
public void execTopology_alt() throws Exception {
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder(this.bootstrapServers, topic).build()), 1);
tp.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");
Config conf = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafkaboltTest", conf, tp.createTopology());
}
似乎有 2 个问题让我忙了几个小时。
1) 未找到 Class
20:48:40.202 [main] INFO org.apache.storm.daemon.metrics.ClientMetricsUtils - Using statistics reporter plugin:org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter
20:48:40.203 [main] INFO org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter - Preparing...
Exception in thread "main" java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter
at org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter.prepare(JmxPreparableReporter.java:32)
at org.apache.storm.metric.StormMetricsRegistry.startMetricsReporters(StormMetricsRegistry.java:74)
at org.apache.storm.LocalCluster.<init>(LocalCluster.java:287)
at org.apache.storm.LocalCluster.<init>(LocalCluster.java:159)
at tki.bigdata.storm.StormTopology.execTopology_alt(StormTopology.java:93)
at tki.bigdata.storm.StormTopology.main(StormTopology.java:46)
Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 6 more
我 post 我的 pom.xml,我也在使用 Spring Boot 和 Spring Kafka(Kafka 工作)。也许这与 Storm 的东西一起工作不好?
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>tki.bigdata</groupId>
<artifactId>RealTime</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>RealTime</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-server -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-server</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-kafka-client -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2) 当我 运行 它时,有大量的输出,永远重复
20:48:40.968 [main-SendThread(localhost:2004)] DEBUG org.apache.storm.shade.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000fc645fb0005, packet:: clientPath:null serverPath:null finished:false header:: 20,4 replyHeader:: 20,39,0 request:: '/storm/supervisors/6de1c57f-06e9-49ae-984f-09b5ad7d05e5,F response:: #1fffffff8b80000000354e41effffff8230105c1022ffffffd018ffffffe2ffffffc1ffffff8377ffffffaf6dffffffaaffffffa160f267cffffffc432d2531ffffffa6ffffffb66930115fffffffe0737cffffff94fffffffb1ffffff8d38ffffff87ffffff9dffffffddffffffd9ffffff9dffffffdd2d202ffffff8f737a7a21ffffff887dffffffb6ffffffd1ffffff9d14ffffffc65cffffffb175ffffff86cffffffee323effffff8930f4bffffffdfffffffdbffffffd5ffffffbdffffffda4bffffffd6cffffff98ffffffd6ffffff8affffffe3ffffff8a77afffffff363ffffffe54b2e58ffffffd7373d65ffffff8affffff95ffffff90146165969ffffffe0ffffffe0fffffff34860ffffffe668ffffffe6ffffffb8ffffff80ffffffe5ffffffac1ffffff82ffffffccffffffc7fffffff44028ffffffa12bffffffc85118effffffbfffffffacffffffa5ffffffbd132bffffff8d53ffffffc42a27ffffffd56d6cffffffa7ffffffbf772ffffffadffffffb47113ffffffd1ffffffa27dffffffffffffff94cfffffffcffffffc1ffffffed7ffffffbe7fffffffda34ffffffc2ffffffd4000,s{35,35,1560624520123,1560624520123,0,0,0,72074938289946636,186,0,35}
20:48:40.968 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x1000fc645fb0005 type:exists cxid:0x15 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
20:48:40.969 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - sessionid:0x1000fc645fb0005 type:exists cxid:0x15 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
20:48:40.969 [main-SendThread(localhost:2004)] DEBUG org.apache.storm.shade.org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000fc645fb0005, packet:: clientPath:null serverPath:null finished:false header:: 21,3 replyHeader:: 21,39,0 request:: '/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34,F response:: s{39,39,1560624520167,1560624520167,0,0,0,72074938289946638,185,0,39}
20:48:40.969 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - Processing request:: sessionid:0x1000fc645fb0005 type:getData cxid:0x16 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
20:48:40.969 [SyncThread:0] DEBUG org.apache.storm.shade.org.apache.zookeeper.server.FinalRequestProcessor - sessionid:0x1000fc645fb0005 type:getData cxid:0x16 zxid:0xfffffffffffffffe txntype:unknown reqpath:/storm/supervisors/5488b159-db82-42d0-b24e-354c59ddce34
问题出在您的 POM 和 Kafka 设置上。
删除storm-kafka
,你不需要它。它是 Storm 的旧 Kafka 集成,storm-kafka-client
取代了它。
我会避免在 Storm 中使用 Spring。 Storm 不了解 Spring bean,因此您的 Spring 设置可能仅适用于本地集群,不适用于生产设置。
您需要将 storm-client
放在类路径中。将其设置为 provided
范围。您也可以删除 storm-core
。
您很可能需要在 spout Kafka 配置中设置一个消费者组。使用 KafkaSpoutConfig.setProp(ConsumerConfig.GROUP_ID_CONFIG, "your-group-here")
https://github.com/apache/storm/blob/v2.0.0/examples/storm-kafka-client-examples 上有一个完整的示例,我会以此为起点。