Apache Storm Flux 简单 KafkaSpout --> KafkaBolt NullPointerException
Apache Storm Flux Simple KafkaSpout --> KafkaBolt NullPointerException
我正在使用 Apache Storm 0.10.0-beta1 并开始将一些拓扑转换为 Flux。我决定从一个简单的拓扑开始,该拓扑从 Kafka 队列读取并写入不同的 Kafka 队列。我收到此错误,我很难找出问题所在。 topology yaml文件遵循错误。
Parsing file: /Users/frank/src/mapper/mapper.yaml
388 [main] INFO o.a.s.f.p.FluxParser - loading YAML from input stream...
391 [main] INFO o.a.s.f.p.FluxParser - Not performing property substitution.
391 [main] INFO o.a.s.f.p.FluxParser - Not performing environment variable substitution.
466 [main] INFO o.a.s.f.FluxBuilder - Detected DSL topology...
Exception in thread "main" java.lang.NullPointerException
at org.apache.storm.flux.FluxBuilder.canInvokeWithArgs(FluxBuilder.java:561)
at org.apache.storm.flux.FluxBuilder.findCompatibleConstructor(FluxBuilder.java:392)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildSpout(FluxBuilder.java:361)
at org.apache.storm.flux.FluxBuilder.buildSpouts(FluxBuilder.java:349)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:84)
at org.apache.storm.flux.Flux.runCli(Flux.java:153)
at org.apache.storm.flux.Flux.main(Flux.java:98)
拓扑 yaml:
name: "mapper-topology"
config:
topology.workers: 1
topology.debug: true
kafka.broker.properties.metadata.broker.list: "localhost:9092"
kafka.broker.properties.request.required.acks: "1"
kafka.broker.properties.serializer.class: "kafka.serializer.StringEncoder"
# component definitions
components:
- id: "topicSelector"
className: "storm.kafka.bolt.selector.DefaultTopicSelector"
constructorArgs:
- "schemaq"
- id: "kafkaMapper"
className: "storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
# spout definitions
spouts:
- id: "kafka-spout"
className: "storm.kafka.SpoutConfig"
parallelism: 1
constructorArgs:
- ref: "zkHosts"
- "mapperq"
- "/mapperq"
- "id-mapperq"
properties:
- name: "forceFromStart"
value: true
- name: "scheme"
ref: "stringMultiScheme"
# bolt definitions
bolts:
- id: "kafka-bolt"
className: "storm.kafka.bolt.KafkaBolt"
parallelism: 1
configMethods:
- name: "withTopicSelector"
args: [ref: "topicSelector"]
- name: "withTupleToKafkaMapper"
args: [ref: "kafkaMapper"]
# streams
streams:
- name: "kafka-spout --> kafka-bolt"
from: "kafka-spout"
to: "kafka-bolt"
grouping:
type: SHUFFLE
这是命令:
storm jar /Users/frank/src/mapper/target/mapper-0.1.0-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local mapper.yaml
spout 类名应该是 storm.kafka.KafkaSpout,而不是 storm.kafka.SpoutConfig。您应该将 SpoutConfig 定义到 "components" 部分,并让 spout 引用它。
您可以参考https://github.com/apache/storm/blob/master/external/flux/flux-examples/src/main/resources/kafka_spout.yaml了解如何从 flux 设置 KafkaSpout。
我正在使用 Apache Storm 0.10.0-beta1 并开始将一些拓扑转换为 Flux。我决定从一个简单的拓扑开始,该拓扑从 Kafka 队列读取并写入不同的 Kafka 队列。我收到此错误,我很难找出问题所在。 topology yaml文件遵循错误。
Parsing file: /Users/frank/src/mapper/mapper.yaml
388 [main] INFO o.a.s.f.p.FluxParser - loading YAML from input stream...
391 [main] INFO o.a.s.f.p.FluxParser - Not performing property substitution.
391 [main] INFO o.a.s.f.p.FluxParser - Not performing environment variable substitution.
466 [main] INFO o.a.s.f.FluxBuilder - Detected DSL topology...
Exception in thread "main" java.lang.NullPointerException
at org.apache.storm.flux.FluxBuilder.canInvokeWithArgs(FluxBuilder.java:561)
at org.apache.storm.flux.FluxBuilder.findCompatibleConstructor(FluxBuilder.java:392)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildSpout(FluxBuilder.java:361)
at org.apache.storm.flux.FluxBuilder.buildSpouts(FluxBuilder.java:349)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:84)
at org.apache.storm.flux.Flux.runCli(Flux.java:153)
at org.apache.storm.flux.Flux.main(Flux.java:98)
拓扑 yaml:
name: "mapper-topology"
config:
topology.workers: 1
topology.debug: true
kafka.broker.properties.metadata.broker.list: "localhost:9092"
kafka.broker.properties.request.required.acks: "1"
kafka.broker.properties.serializer.class: "kafka.serializer.StringEncoder"
# component definitions
components:
- id: "topicSelector"
className: "storm.kafka.bolt.selector.DefaultTopicSelector"
constructorArgs:
- "schemaq"
- id: "kafkaMapper"
className: "storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper"
# spout definitions
spouts:
- id: "kafka-spout"
className: "storm.kafka.SpoutConfig"
parallelism: 1
constructorArgs:
- ref: "zkHosts"
- "mapperq"
- "/mapperq"
- "id-mapperq"
properties:
- name: "forceFromStart"
value: true
- name: "scheme"
ref: "stringMultiScheme"
# bolt definitions
bolts:
- id: "kafka-bolt"
className: "storm.kafka.bolt.KafkaBolt"
parallelism: 1
configMethods:
- name: "withTopicSelector"
args: [ref: "topicSelector"]
- name: "withTupleToKafkaMapper"
args: [ref: "kafkaMapper"]
# streams
streams:
- name: "kafka-spout --> kafka-bolt"
from: "kafka-spout"
to: "kafka-bolt"
grouping:
type: SHUFFLE
这是命令:
storm jar /Users/frank/src/mapper/target/mapper-0.1.0-SNAPSHOT-standalone.jar org.apache.storm.flux.Flux --local mapper.yaml
spout 类名应该是 storm.kafka.KafkaSpout,而不是 storm.kafka.SpoutConfig。您应该将 SpoutConfig 定义到 "components" 部分,并让 spout 引用它。
您可以参考https://github.com/apache/storm/blob/master/external/flux/flux-examples/src/main/resources/kafka_spout.yaml了解如何从 flux 设置 KafkaSpout。