Storm 1.2.2 和 Kafka 版本 2.x
Storm 1.2.2 and Kafka Version 2.x
我正在测试一个使用 Storm 1.2.2 和 Kafka 2.x 作为我的 Spout 的案例。所以我创建了一个 LocalCluster 只是为了测试目的。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
Utils.sleep(10000);
初始化此应用程序后,我得到以下信息:
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4
在大量错误之后:
9664 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
我认为这个问题是因为 Kafka 版本,你可以看到日志显示版本“0.10.1.0”,但我的 Kafka 版本是“2.x”。
这是我的 pom.xml:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${version.storm}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${version.storm}</version>
</dependency>
其中 ${version.storm}
是 1.2.2
您还应该声明您正在使用的 kafka-clients
版本。 storm-kafka-client
POM 将 kafka-clients
范围设置为 provided
。这意味着 kafka-clients
不会在您构建时包含在内。我们这样做是为了让您可以轻松升级。
它对你来说甚至 运行 的原因是因为你在一些测试代码中使用了 LocalCluster,其中存在 provided
依赖项。
将此添加到您的 POM,它应该可以工作:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your-kafka-version-here</version>
</dependency>
我正在测试一个使用 Storm 1.2.2 和 Kafka 2.x 作为我的 Spout 的案例。所以我创建了一个 LocalCluster 只是为了测试目的。
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("MYKAFKAIP:9092", "storm-test-dpi").build()), 1);
builder.setBolt("bolt", new LoggerBolt()).shuffleGrouping("kafka_spout");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafkaBoltTest", new Config(), builder.createTopology());
Utils.sleep(10000);
初始化此应用程序后,我得到以下信息:
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka version : 0.10.1.0
9293 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.k.c.u.AppInfoParser - Kafka commitId : 3402a74efb23d1d4
在大量错误之后:
9664 [Thread-20-kafka_spout-executor[3 3]] INFO o.a.s.k.s.KafkaSpout - Initialization complete
9703 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9714 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9742 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9756 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9767 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9781 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
9806 [Thread-20-kafka_spout-executor[3 3]] WARN o.a.k.c.c.i.Fetcher - Unknown error fetching data for topic-partition storm-test-dpi-0
我认为这个问题是因为 Kafka 版本,你可以看到日志显示版本“0.10.1.0”,但我的 Kafka 版本是“2.x”。
这是我的 pom.xml:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${version.storm}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${version.storm}</version>
</dependency>
其中 ${version.storm}
是 1.2.2
您还应该声明您正在使用的 kafka-clients
版本。 storm-kafka-client
POM 将 kafka-clients
范围设置为 provided
。这意味着 kafka-clients
不会在您构建时包含在内。我们这样做是为了让您可以轻松升级。
它对你来说甚至 运行 的原因是因为你在一些测试代码中使用了 LocalCluster,其中存在 provided
依赖项。
将此添加到您的 POM,它应该可以工作:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>your-kafka-version-here</version>
</dependency>