storm.kafka.UpdateOffsetException - 不透明三叉戟 Kafka Spout 问题

storm.kafka.UpdateOffsetException - Issue with Opaque Trident Kafka Spout

我在 OpaqueTridentKafkaSpout 中使用三叉戟拓扑。

我正在使用的 TridentKafkaConfig 代码片段:-

OpaqueTridentKafkaSpout kafkaSpout = null;
TridentKafkaConfig spoutConfig = new TridentKafkaConfig(new ZkHosts("xxx.x.x.9:2181,xxx.x.x.1:2181,xxx.x.x.2:2181"), "topic_name");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.fetchSizeBytes = 147483600;
kafkaSpout = new OpaqueTridentKafkaSpout(spoutConfig);

我从其中一名工作人员那里得到了这个运行时异常:-

java.lang.RuntimeException: storm.kafka.UpdateOffsetException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:135) at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:106) at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) at backtype.storm.daemon.executor$fn_5694$fn5707$fn5758.invoke(executor.clj:819) at backtype.storm.util$async_loop$fn545.invoke(util.clj:479) at clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745) Caused by: storm.kafka.UpdateOffsetException at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:186) at storm.kafka.trident.TridentKafkaEmitter.fetchMessages(TridentKafkaEmitter.java:132) at storm.kafka.trident.TridentKafkaEmitter.doEmitNewPartitionBatch(TridentKafkaEmitter.java:113) at storm.kafka.trident.TridentKafkaEmitter.failFastEmitNewPartitionBatch(TridentKafkaEmitter.java:72) at storm.kafka.trident.TridentKafkaEmitter.emitNewPartitionBatch(TridentKafkaEmitter.java:79) at storm.kafka.trident.TridentKafkaEmitter.access[=12=]0(TridentKafkaEmitter.java:46) at storm.kafka.trident.TridentKafkaEmitter.emitPartitionBatch(TridentKafkaEmitter.java:204) at storm.kafka.trident.TridentKafkaEmitter.emitPartitionBatch(TridentKafkaEmitter.java:194) at storm.trident.spout.OpaquePartitionedTridentSpoutExecutor$Emitter.emitBatch(OpaquePartitionedTridentSpoutExecutor.java:127) at storm.trident.spout.TridentSpoutExecutor.execute(TridentSpoutExecutor.java:82) at storm.trident.topology.TridentBoltExecutor.execute(TridentBoltExecutor.java:370) at backtype.storm.daemon.executor$fn5694$tuple_action_fn5696.invoke(executor.clj:690) at backtype.storm.daemon.executor$mk_task_receiver$fn5615.invoke(executor.clj:436) at backtype.storm.disruptor$clojure_handler$reify_5189.onEvent(disruptor.clj:58) at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:127) ... 6 more

根据一些帖子,我尝试设置 spoutConfig :- spoutConfig.maxOffsetBehind = Long.MAX_VALUE; spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); 我的 Kafka 保留时间是默认的 - 128 小时,即 7 天,kafka 生产者正在发送 6800 messages/second 到 Storm/Trident 拓扑。我浏览了大部分帖子,但其中 none 似乎解决了这个问题。处理此问题的最佳方法是什么?

我仍然不知道是什么导致了这个问题。但是基本上我们并没有正常关闭storm、zookeeper和kafka。这导致风暴拓扑失败,我们不得不拆除整个集群并再次re-build。更新到 storm 0.10.0 有助于解决其他一些问题。