storm.kafka.UpdateOffsetException - 不透明三叉戟 Kafka Spout 问题
storm.kafka.UpdateOffsetException - Issue with Opaque Trident Kafka Spout
我在 OpaqueTridentKafkaSpout 中使用三叉戟拓扑。
我正在使用的 TridentKafkaConfig 代码片段:-
OpaqueTridentKafkaSpout kafkaSpout = null;
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.fetchSizeBytes = 147483600;
kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);
我从其中一名工作人员那里得到了这个运行时异常:-
java.lang.RuntimeException: storm.kafka.UpdateOffsetException at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.daemon.executor$fn_5694$fn5707$fn5758.invoke(executor.clj:819)
at backtype.storm.util$async_loop$fn545.invoke(util.clj:479) at
clojure.lang.AFn.run(AFn.java:22) at
java.lang.Thread.run(Thread.java:745) Caused by:
storm.kafka.UpdateOffsetException at
storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at
storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132)
at
storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113)
at
storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72)
at
storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79)
at
storm.kafka.trident.TridentKafkaEmitter.access[=12=]0(TridentKafkaEmitter.java:46)
at
storm.kafka.trident.TridentKafkaEmitter.emitPartitionBatch(TridentKafkaEmitter.java:204)
at
storm.kafka.trident.TridentKafkaEmitter.emitPartitionBatch(TridentKafkaEmitter.java:194)
at
storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127)
at
storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82)
at
storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370)
at
backtype.storm.daemon.executor$fn5694$tuple_action_fn5696.invoke(executor.clj:690)
at
backtype.storm.daemon.executor$mk_task_receiver$fn5615.invoke(executor.clj:436)
at
backtype.storm.disruptor$clojure_handler$reify_5189.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127)
... 6 more
根据一些帖子,我尝试设置 spoutConfig :-
spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
我的 Kafka 保留时间是默认的 - 128 小时,即 7 天,kafka 生产者正在发送 6800 messages/second 到 Storm/Trident 拓扑。我浏览了大部分帖子,但其中 none 似乎解决了这个问题。处理此问题的最佳方法是什么?
我仍然不知道是什么导致了这个问题。但是基本上我们并没有正常关闭storm、zookeeper和kafka。这导致风暴拓扑失败,我们不得不拆除整个集群并再次re-build。更新到 storm 0.10.0 有助于解决其他一些问题。
我在 OpaqueTridentKafkaSpout 中使用三叉戟拓扑。
我正在使用的 TridentKafkaConfig 代码片段:-
OpaqueTridentKafkaSpout kafkaSpout = null;
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.fetchSizeBytes = 147483600;
kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);
我从其中一名工作人员那里得到了这个运行时异常:-
java.lang.RuntimeException: storm.kafka.UpdateOffsetException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn_5694$fn5707$fn5758.invoke(executor.clj:819) at backtype.storm.util$async_loop$fn545.invoke(util.clj:479) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) Caused by: storm.kafka.UpdateOffsetException at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) at storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) at storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) at storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) at storm.kafka.trident.TridentKafkaEmitter.access[=12=]0(TridentKafkaEmitter.java:46) at storm.kafka.trident.TridentKafkaEmitter.emitPartitionBatch(TridentKafkaEmitter.java:204) at storm.kafka.trident.TridentKafkaEmitter.emitPartitionBatch(TridentKafkaEmitter.java:194) at storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127) at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370) at backtype.storm.daemon.executor$fn5694$tuple_action_fn5696.invoke(executor.clj:690) at backtype.storm.daemon.executor$mk_task_receiver$fn5615.invoke(executor.clj:436) at backtype.storm.disruptor$clojure_handler$reify_5189.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ... 6 more
根据一些帖子,我尝试设置 spoutConfig :- spoutConfig.maxOffsetBehind = Long.MAX_VALUE; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); 我的 Kafka 保留时间是默认的 - 128 小时,即 7 天,kafka 生产者正在发送 6800 messages/second 到 Storm/Trident 拓扑。我浏览了大部分帖子,但其中 none 似乎解决了这个问题。处理此问题的最佳方法是什么?
我仍然不知道是什么导致了这个问题。但是基本上我们并没有正常关闭storm、zookeeper和kafka。这导致风暴拓扑失败,我们不得不拆除整个集群并再次re-build。更新到 storm 0.10.0 有助于解决其他一些问题。