Streamparse 连续调用 next_tuple
Streamparse calling next_tuple continuously
我正在尝试使用 Streamparse 在 Python 中编写一个简单的 Storm 拓扑。除了我写的简单的 Kafka spout 之外,一切都对我有用——它似乎只是不断地调用 "next_tuple"。我的螺栓相当慢,所以系统似乎很快就会在内存中爆炸。
启动拓扑,我尝试将 topology.max.spout.pending 设置为 1 以防止它向拓扑添加太多消息。
lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1'
然而,结果仍然是这样,尽管螺栓要慢得多:
24790 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24942 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24944 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24946 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25143 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25144 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25350 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
......
我的简单 Kafka spout:
class MetadataSpout(Spout):
def initialize(self, stormconf, context):
self.log('----CONFIG: %s----' % stormconf)
k = KafkaClient(os.getenv('KAFKA'))
self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')
def next_tuple(self):
self.log('----NEXT TUPLE----')
messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
self.emit([json.dumps([m.message.value for m in messages])])
我的螺栓只有默认配置,但要花费大量时间才能完成 process() 方法。我不知道它们怎么会是问题所在,但如果它们相关,我可以 post。
Solved, thanks to the great Streamparse team
"topology.max.spout.pending only has an effect if your spout is reliable. You'll need to specify the optional tup_id parameter to emit to give each tuple a unique ID. Once you do that, all should be well."
为发出的元组指定一个 UUID 后,这个问题就解决了。
我正在尝试使用 Streamparse 在 Python 中编写一个简单的 Storm 拓扑。除了我写的简单的 Kafka spout 之外,一切都对我有用——它似乎只是不断地调用 "next_tuple"。我的螺栓相当慢,所以系统似乎很快就会在内存中爆炸。
启动拓扑,我尝试将 topology.max.spout.pending 设置为 1 以防止它向拓扑添加太多消息。
lein run -m streamparse.commands.run/-main topologies/.clj -t 100 --option 'topology.max.spout.pending=1' --option 'topology.workers=1' --option 'topology.acker.executors=1'
然而,结果仍然是这样,尽管螺栓要慢得多:
24790 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24942 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24944 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
24946 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25143 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25144 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
25350 [Thread-16-metadata-spout] INFO backtype.storm.spout.ShellSpout - Shell msg: ----NEXT TUPLE----
......
我的简单 Kafka spout:
class MetadataSpout(Spout):
def initialize(self, stormconf, context):
self.log('----CONFIG: %s----' % stormconf)
k = KafkaClient(os.getenv('KAFKA'))
self.consumer = SimpleConsumer(k, 'vacuum', 'metadata')
def next_tuple(self):
self.log('----NEXT TUPLE----')
messages = self.consumer.get_messages(count=os.getenv('BATCH_COUNT', 20))
self.emit([json.dumps([m.message.value for m in messages])])
我的螺栓只有默认配置,但要花费大量时间才能完成 process() 方法。我不知道它们怎么会是问题所在,但如果它们相关,我可以 post。
Solved, thanks to the great Streamparse team
"topology.max.spout.pending only has an effect if your spout is reliable. You'll need to specify the optional tup_id parameter to emit to give each tuple a unique ID. Once you do that, all should be well."
为发出的元组指定一个 UUID 后,这个问题就解决了。