Storm-kafka 喷口失败

Storm-kafka spout failing

我正在使用 storm0.9.4 和 storm-kafka:0.9.0-wip16a-scala292 作为从 kafka 0.7 读取的依赖项。

我在几分钟内启动拓扑后立即收到以下错误:

kafka.common.OffsetOutOfRangeException: null
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_75]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_75]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_75]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_75]
        at java.lang.Class.newInstance(Class.java:379) ~[na:1.7.0_75]
        at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) ~[stormjar.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) ~[stormjar.jar:na]
        at kafka.message.MessageSet.foreach(MessageSet.scala:87) ~[stormjar.jar:na]
        at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104) ~[stormjar.jar:na]
        at kafka.message.MessageSet.size(MessageSet.scala:87) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:113) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.next(PartitionManager.java:83) ~[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 2015-04-30T01:49:15.118-0500 backtype.storm.daemon.executor [ERROR] kafka.common.OffsetOutOfRangeException: null
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.7.0_75]
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) ~[na:1.7.0_75]
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.7.0_75]
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526) ~[na:1.7.0_75]
        at java.lang.Class.newInstance(Class.java:379) ~[na:1.7.0_75]
        at kafka.common.ErrorMapping$.maybeThrowException(ErrorMapping.scala:53) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.kafka$message$ByteBufferMessageSet$$internalIterator(ByteBufferMessageSet.scala:99) ~[stormjar.jar:na]
        at kafka.message.ByteBufferMessageSet.iterator(ByteBufferMessageSet.scala:82) ~[stormjar.jar:na]
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) ~[stormjar.jar:na]
        at kafka.message.MessageSet.foreach(MessageSet.scala:87) ~[stormjar.jar:na]
        at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:104) ~[stormjar.jar:na]
        at kafka.message.MessageSet.size(MessageSet.scala:87) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.fill(PartitionManager.java:113) ~[stormjar.jar:na]
        at storm.kafka.PartitionManager.next(PartitionManager.java:83) ~[stormjar.jar:na]
        at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:106) ~[stormjar.jar:na]
        at backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) ~[storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) ~[storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75] 2015-04-30T01:49:15.129-0500 backtype.storm.util [ERROR] Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died")
        at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
        at backtype.storm.daemon.worker$fn__5102$fn__5103.invoke(worker.clj:495) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.daemon.executor$mk_executor_data$fn__4555$fn__4556.invoke(executor.clj:240) [storm-core-0.9.4.jar:0.9.4]
        at backtype.storm.util$async_loop$fn__458.invoke(util.clj:473) [storm-core-0.9.4.jar:0.9.4]
        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]

我无法找到发生这种情况的原因。请提供任何 help/pointers 关于我如何找到问题的信息。

spout 的配置方式有问题。我们有用于初始化 SpoutConfig 对象的自定义属性,我们始终设置 forceFromStart 和 startOffsetTime(最新)。问题是与后者相关的 属性 配置了错误的密钥,因此有时 spout 的 zk 条目有一个较早的偏移条目,该条目在 kafka 中不再存在,或者指的是当风暴拓扑已启动,但在风暴完成积压之前已从卡夫卡中删除。由于无论如何我们都不想满足这种情况,我们只是更正了配置及其工作方式。