来自 Apache Storm Trident 和 Kafka 集成的 Spout 错误
Spout Error from Apache Storm Trident and Kafka Integration
我正在使用 OpaqueTridentKafkaSpout 来使用来自 Kafka 的消息。下面是代码。我忽略了 max spout pending
配置,因为这会导致相同的 kafka 消息分批到达。
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
Kafka Spout 启动时出现以下错误,但之后运行顺利。
2018-05-29 09:47:21.703 o.a.s.util
Thread-9-spout-myspout-Spout-executor[33 33] [ERROR] Async loop died!
java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484)
[storm-core-1.2.1.jar:1.2.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: java.lang.NullPointerException
at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193)
~[stormjar.jar:?]
at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127)
~[stormjar.jar:?]
at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51)
~[stormjar.jar:?]
at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41)
~[storm-core-1.2.1.jar:1.2.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509)
~[storm-core-1.2.1.jar:1.2.1]
... 6 more
对此有什么建议吗?
您的堆栈跟踪表明您正在点击 https://issues.apache.org/jira/browse/STORM-3046。
我正在使用 OpaqueTridentKafkaSpout 来使用来自 Kafka 的消息。下面是代码。我忽略了 max spout pending
配置,因为这会导致相同的 kafka 消息分批到达。
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
Kafka Spout 启动时出现以下错误,但之后运行顺利。
2018-05-29 09:47:21.703 o.a.s.util Thread-9-spout-myspout-Spout-executor[33 33] [ERROR] Async loop died! java.lang.RuntimeException: java.lang.NullPointerException at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:522) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:487) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$fn__5056$fn__5109.invoke(executor.clj:861) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_171] Caused by: java.lang.NullPointerException at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.seek(KafkaTridentSpoutEmitter.java:193) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:127) ~[stormjar.jar:?] at org.apache.storm.kafka.spout.trident.KafkaTridentSpoutEmitter.emitPartitionBatch(KafkaTridentSpoutEmitter.java:51) ~[stormjar.jar:?] at org.apache.storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:141) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:383) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$fn__5043$tuple_action_fn__5045.invoke(executor.clj:739) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.daemon.executor$mk_task_receiver$fn__4964.invoke(executor.clj:468) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.disruptor$clojure_handler$reify__4475.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.jar:1.2.1] at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:509) ~[storm-core-1.2.1.jar:1.2.1] ... 6 more
对此有什么建议吗?
您的堆栈跟踪表明您正在点击 https://issues.apache.org/jira/browse/STORM-3046。