风暴集群模式,分布式bolt/worker负载分担
storm cluster mode, distributed bolt/worker load sharing
HI:我有一个大容量风暴分析任务。对我来说,我想分拆许多 bolt/workers 到不同的 nodes/machines 来承担任务,以便每台机器都可以分担负载。我想知道如何编写 bolt/workers/topology 以便它们可以相互通信。在下面的代码中,我在一台机器上提交拓扑,如何在其他机器上写bolt/worker/config,以便拓扑知道其他机器的bolt/worker。我想我不能在一台机器上提交拓扑并在其他机器上提交相同的拓扑。
关于风暴工作者负载共享的任何提示?
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class StormClusterMain {
private static final String SPOUTNAME="KafkaSpout";
private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
private static final String CLIENTID = "ClusterStorm";
private static final String TOPOLOGYNAME = "ClusterTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private static final String collectionName="clusterusers";
private OutputCollector _collector;
private AtomicInteger index = new AtomicInteger(0);
private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class);
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
boltLogger.error("Message received:"+tuple.getString(0));
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("Usage :.xxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
.shuffleGrouping(SPOUTNAME);
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());
你已经完成了,除非出现问题。
当您向 Storm 提交拓扑时,Nimbus 服务会通过分布在整个集群中的 Supervisor 进程查看集群上的负载。 Nimbus 然后为 运行 的拓扑提供一定数量的资源。这些资源通常分布在集群中的各个主管节点中,它们将并行处理元组。 Nimbus 偶尔会重新审视这些决策并更改哪些节点处理哪些内容,以试图保持集群中的负载平衡。作为用户,您永远不应该注意到该过程。
假设您的 Storm 集群设置正确,您唯一需要做的就是提交拓扑。 Storm 会为您处理整个多节点并行处理的事情。
就是说,您拥有的 AtomicInteger 会表现得很奇怪,因为风暴将您的代码分割到多个服务器,甚至单个主机上的多个 JVM。如果您想解决单个风暴进程需要了解较大集群状态的情况,最好将其外部化到某种独立的数据存储(即 redis 或 hbase)。
HI:我有一个大容量风暴分析任务。对我来说,我想分拆许多 bolt/workers 到不同的 nodes/machines 来承担任务,以便每台机器都可以分担负载。我想知道如何编写 bolt/workers/topology 以便它们可以相互通信。在下面的代码中,我在一台机器上提交拓扑,如何在其他机器上写bolt/worker/config,以便拓扑知道其他机器的bolt/worker。我想我不能在一台机器上提交拓扑并在其他机器上提交相同的拓扑。 关于风暴工作者负载共享的任何提示?
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class StormClusterMain {
private static final String SPOUTNAME="KafkaSpout";
private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
private static final String CLIENTID = "ClusterStorm";
private static final String TOPOLOGYNAME = "ClusterTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private static final String collectionName="clusterusers";
private OutputCollector _collector;
private AtomicInteger index = new AtomicInteger(0);
private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class);
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
boltLogger.error("Message received:"+tuple.getString(0));
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("Usage :.xxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
.shuffleGrouping(SPOUTNAME);
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());
你已经完成了,除非出现问题。
当您向 Storm 提交拓扑时,Nimbus 服务会通过分布在整个集群中的 Supervisor 进程查看集群上的负载。 Nimbus 然后为 运行 的拓扑提供一定数量的资源。这些资源通常分布在集群中的各个主管节点中,它们将并行处理元组。 Nimbus 偶尔会重新审视这些决策并更改哪些节点处理哪些内容,以试图保持集群中的负载平衡。作为用户,您永远不应该注意到该过程。
假设您的 Storm 集群设置正确,您唯一需要做的就是提交拓扑。 Storm 会为您处理整个多节点并行处理的事情。
就是说,您拥有的 AtomicInteger 会表现得很奇怪,因为风暴将您的代码分割到多个服务器,甚至单个主机上的多个 JVM。如果您想解决单个风暴进程需要了解较大集群状态的情况,最好将其外部化到某种独立的数据存储(即 redis 或 hbase)。