使用定制的 IScheduler 时,如何将远程 Storm worker 的指标发送到 Graphite 服务器?
How to emit metrics from remote Storm worker to a Graphite server when using a customized IScheduler?
我正在很好地收集从 Apache Storm 到 Graphite 的指标。然后我开发了一个自定义的调度器,它实现了 IScheduler 接口,现在我无法收集任何指标。
这是我的scheduler:
public class SiteAwareScheduler implements IScheduler {
@Override
public void prepare(Map conf) {
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
....
}
}
在 storm.yaml 文件上:
supervisor.scheduler.meta:
site: "cluster"
storm.scheduler: "org.sense.storm.scheduler.SiteAwareScheduler"
我正在使用 this library 收集指标并将其发送到 Graphite 服务器。
public class MyTopology {
config.put(YammerFacadeMetric.FACADE_METRIC_TIME_BUCKET_IN_SEC, 30);
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_HOST, "127.0.0.1");
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_PORT, 2003);
config.put(SimpleGraphiteStormMetricProcessor.REPORT_PERIOD_IN_SEC, 10);
config.put(Config.TOPOLOGY_NAME, "MqttSensorSumTopology");
config.registerMetricsConsumer(MetricReporter.class, new MetricReporterConfig(".*", SimpleGraphiteStormMetricProcessor.class.getCanonicalName()), 1);
...
TopologyBuilder topologyBuilder = new TopologyBuilder();
...
topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TICKET_SUM.getValue(), new SumSensorValuesWindowBolt(SensorType.COUNTER_TICKETS).withTumblingWindow(Duration.seconds(5)), 1)
.shuffleGrouping(MqttSensors.SPOUT_STATION_01_TICKETS.getValue())
.addConfiguration(TagSite.SITE.getValue(), TagSite.CLUSTER.getValue());
}
public class SumSensorValuesWindowBolt extends BaseWindowedBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
StormYammerMetricsAdapter.configure(stormConf, context, new MetricsRegistry());
this.collector = collector;
}
public void execute(TupleWindow inputWindow) {
...
}
}
IScheduler 的 "prepare" 方法没有 TopologyContext,所以我不知道在哪里用新的计划实例化我的指标。
有什么提示吗?
谢谢
费利佩
我在两台机器上都按照 link. Then I configured the file storm.yaml as it says in this link 上的说明安装了 Graphite。主人和工人。
storm.metrics.reporters:
# Graphite Reporter
- class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
daemons:
- "supervisor"
- "nimbus"
- "worker"
report.period: 60
report.period.units: "SECONDS"
graphite.host: "localhost"
graphite.port: 2003
然后指标会出现在我的拓扑名称下的 Graphite 服务器上 MqttSensorSumTopology-1-1553506290
。
我正在很好地收集从 Apache Storm 到 Graphite 的指标。然后我开发了一个自定义的调度器,它实现了 IScheduler 接口,现在我无法收集任何指标。
这是我的scheduler:
public class SiteAwareScheduler implements IScheduler {
@Override
public void prepare(Map conf) {
}
@Override
public void schedule(Topologies topologies, Cluster cluster) {
....
}
}
在 storm.yaml 文件上:
supervisor.scheduler.meta:
site: "cluster"
storm.scheduler: "org.sense.storm.scheduler.SiteAwareScheduler"
我正在使用 this library 收集指标并将其发送到 Graphite 服务器。
public class MyTopology {
config.put(YammerFacadeMetric.FACADE_METRIC_TIME_BUCKET_IN_SEC, 30);
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_HOST, "127.0.0.1");
config.put(SimpleGraphiteStormMetricProcessor.GRAPHITE_PORT, 2003);
config.put(SimpleGraphiteStormMetricProcessor.REPORT_PERIOD_IN_SEC, 10);
config.put(Config.TOPOLOGY_NAME, "MqttSensorSumTopology");
config.registerMetricsConsumer(MetricReporter.class, new MetricReporterConfig(".*", SimpleGraphiteStormMetricProcessor.class.getCanonicalName()), 1);
...
TopologyBuilder topologyBuilder = new TopologyBuilder();
...
topologyBuilder.setBolt(MqttSensors.BOLT_SENSOR_TICKET_SUM.getValue(), new SumSensorValuesWindowBolt(SensorType.COUNTER_TICKETS).withTumblingWindow(Duration.seconds(5)), 1)
.shuffleGrouping(MqttSensors.SPOUT_STATION_01_TICKETS.getValue())
.addConfiguration(TagSite.SITE.getValue(), TagSite.CLUSTER.getValue());
}
public class SumSensorValuesWindowBolt extends BaseWindowedBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
StormYammerMetricsAdapter.configure(stormConf, context, new MetricsRegistry());
this.collector = collector;
}
public void execute(TupleWindow inputWindow) {
...
}
}
IScheduler 的 "prepare" 方法没有 TopologyContext,所以我不知道在哪里用新的计划实例化我的指标。
有什么提示吗? 谢谢 费利佩
我在两台机器上都按照 link. Then I configured the file storm.yaml as it says in this link 上的说明安装了 Graphite。主人和工人。
storm.metrics.reporters:
# Graphite Reporter
- class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter"
daemons:
- "supervisor"
- "nimbus"
- "worker"
report.period: 60
report.period.units: "SECONDS"
graphite.host: "localhost"
graphite.port: 2003
然后指标会出现在我的拓扑名称下的 Graphite 服务器上 MqttSensorSumTopology-1-1553506290
。