prepare 方法执行多次
prepare method executing multiple times
您好,我正在使用 apache-storm 创建拓扑,其中我的 Spout 从 Kakfa 主题收集数据并将其发送到螺栓。
我正在对元组进行一些验证并再次为其他螺栓发出流。
现在的问题是我的第二个螺栓使用第一个螺栓的流有一个重载方法prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector)
假设每 2 秒执行一次。
拓扑代码是
topologyBuilder.setBolt("abc",new ValidationBolt()).shuffleGrouping(configurations.SPOUT_ID);
topologyBuilder.setBolt("TEST",new TestBolt()).shuffleGrouping("abc",Utils.VALIDATED_STREAM);
第一个螺栓 "abc" 的代码是
@Override
public void execute(Tuple tuple) {
String document = String.valueOf(tuple.getValue(4));
if (Utils.isJSONValid(document)) {
outputCollector.emit(Utils.VALIDATED_STREAM,new Values(document));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream(Utils.VALIDATED_STREAM,new Fields("document"));
}
我在搜索时发现了
The prepare method is called when the bolt is initialised and is
similar to the open method in spout. It is called only once for the bolt.
It gets the configuration for the bolt and also the context of the bolt.
The collector is used to emit or output the tuples from this bolt.
Link 到 public 日志要点
Storm topology log
您的日志显示您正在使用 LocalCluster。这是一个 testing/demo 工具,请勿将其用于生产工作负载。而是建立一个真正的分布式集群。
关于正在发生的事情:
当您在 LocalCluster 中使用 运行 拓扑时,Storm 通过 运行 将所有组件(Nimbus、Supervisors 和 worker)作为单个 JVM 中的线程来模拟真实的集群。您的日志显示这些行:
20:14:12.451 [SLOT_1027] INFO o.a.s.ProcessSimulator - Begin killing process 2ea97301-24c9-4c1a-bcba-61008693971a
20:14:12.451 [SLOT_1027] INFO o.a.s.d.w.Worker - Shutting down worker smart-transactional-data-1-1566571315 72bbf510-c342-4385-9599-0821a2dee94e 1027
20:14:15.518 [SLOT_1027] INFO o.a.s.d.s.Slot - STATE running msInState: 33328 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a -> kill-blob-update msInState: 3001 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a
20:14:15.540 [SLOT_1027] INFO o.a.s.d.w.Worker - Launching worker for smart-transactional-data-1-1566571315
LocalCluster 正在关闭其中一个模拟工作器,因为 blob 存储区中的一个 blob(例如拓扑 jar、拓扑配置、其他类型的共享文件,请参阅 https://storm.apache.org/releases/2.0.0/distcache-blobstore.html 了解更多)。通常当这种情况发生在真实集群中时,worker JVM 将被终止,blob 将被更新并且 worker 将重新启动。由于您使用的是 LocalCluster,它只会终止工作线程并重新启动它。这就是为什么您会看到多次调用 prepare
.
您好,我正在使用 apache-storm 创建拓扑,其中我的 Spout 从 Kakfa 主题收集数据并将其发送到螺栓。
我正在对元组进行一些验证并再次为其他螺栓发出流。
现在的问题是我的第二个螺栓使用第一个螺栓的流有一个重载方法prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector)
假设每 2 秒执行一次。
拓扑代码是
topologyBuilder.setBolt("abc",new ValidationBolt()).shuffleGrouping(configurations.SPOUT_ID);
topologyBuilder.setBolt("TEST",new TestBolt()).shuffleGrouping("abc",Utils.VALIDATED_STREAM);
第一个螺栓 "abc" 的代码是
@Override
public void execute(Tuple tuple) {
String document = String.valueOf(tuple.getValue(4));
if (Utils.isJSONValid(document)) {
outputCollector.emit(Utils.VALIDATED_STREAM,new Values(document));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream(Utils.VALIDATED_STREAM,new Fields("document"));
}
我在搜索时发现了
The prepare method is called when the bolt is initialised and is
similar to the open method in spout. It is called only once for the bolt.
It gets the configuration for the bolt and also the context of the bolt.
The collector is used to emit or output the tuples from this bolt.
Link 到 public 日志要点 Storm topology log
您的日志显示您正在使用 LocalCluster。这是一个 testing/demo 工具,请勿将其用于生产工作负载。而是建立一个真正的分布式集群。
关于正在发生的事情:
当您在 LocalCluster 中使用 运行 拓扑时,Storm 通过 运行 将所有组件(Nimbus、Supervisors 和 worker)作为单个 JVM 中的线程来模拟真实的集群。您的日志显示这些行:
20:14:12.451 [SLOT_1027] INFO o.a.s.ProcessSimulator - Begin killing process 2ea97301-24c9-4c1a-bcba-61008693971a
20:14:12.451 [SLOT_1027] INFO o.a.s.d.w.Worker - Shutting down worker smart-transactional-data-1-1566571315 72bbf510-c342-4385-9599-0821a2dee94e 1027
20:14:15.518 [SLOT_1027] INFO o.a.s.d.s.Slot - STATE running msInState: 33328 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a -> kill-blob-update msInState: 3001 topo:smart-transactional-data-1-1566571315 worker:2ea97301-24c9-4c1a-bcba-61008693971a
20:14:15.540 [SLOT_1027] INFO o.a.s.d.w.Worker - Launching worker for smart-transactional-data-1-1566571315
LocalCluster 正在关闭其中一个模拟工作器,因为 blob 存储区中的一个 blob(例如拓扑 jar、拓扑配置、其他类型的共享文件,请参阅 https://storm.apache.org/releases/2.0.0/distcache-blobstore.html 了解更多)。通常当这种情况发生在真实集群中时,worker JVM 将被终止,blob 将被更新并且 worker 将重新启动。由于您使用的是 LocalCluster,它只会终止工作线程并重新启动它。这就是为什么您会看到多次调用 prepare
.