风暴集群模式,分布式bolt/worker负载分担

storm cluster mode, distributed bolt/worker load sharing

HI:我有一个大容量风暴分析任务。对我来说,我想分拆许多 bolt/workers 到不同的 nodes/machines 来承担任务,以便每台机器都可以分担负载。我想知道如何编写 bolt/workers/topology 以便它们可以相互通信。在下面的代码中,我在一台机器上提交拓扑,如何在其他机器上写bolt/worker/config,以便拓扑知道其他机器的bolt/worker。我想我不能在一台机器上提交拓扑并在其他机器上提交相同的拓扑。 关于风暴工作者负载共享的任何提示?

import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class StormClusterMain {
    private static final String SPOUTNAME="KafkaSpout"; 
    private static final String ANALYSISBOLT = "ClusterAnalysisWorker";
    private static final String CLIENTID = "ClusterStorm";
    private static final String TOPOLOGYNAME = "ClusterTopology";

    private static class AppAnalysisBolt extends BaseRichBolt {
        private static final long serialVersionUID = -6885792881303198646L;
        private static final String collectionName="clusterusers";
        private OutputCollector _collector;
        private AtomicInteger index = new AtomicInteger(0); 
        private static final Logger boltLogger = LoggerFactory.getLogger(AppAnalysisBolt.class); 

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {  
            boltLogger.error("Message received:"+tuple.getString(0));
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }


    }

   public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException{

       String zookeepers = null;
       String topicName = null;
       if(args.length == 2 ){
           zookeepers = args[0];
           topicName = args[1];
       }else{
           System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
           System.out.println("Usage :.xxx");
           System.exit(-1);
       }        

       SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
            topicName,
            "",// zookeeper root path for offset storing
            CLIENTID);
       spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
       KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

       TopologyBuilder builder = new TopologyBuilder();
       builder.setSpout(SPOUTNAME, kafkaSpout, 1);
       builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt())
                                                 .shuffleGrouping(SPOUTNAME);

        //Configuration
        Config conf = new Config();
        conf.setDebug(false);
        //Topology run
        conf.setNumWorkers(3);
        StormSubmitter.submitTopologyWithProgressBar(TOPOLOGYNAME, conf, builder.createTopology());

你已经完成了,除非出现问题。

当您向 Storm 提交拓扑时,Nimbus 服务会通过分布在整个集群中的 Supervisor 进程查看集群上的负载。 Nimbus 然后为 运行 的拓扑提供一定数量的资源。这些资源通常分布在集群中的各个主管节点中,它们将并行处理元组。 Nimbus 偶尔会重新审视这些决策并更改哪些节点处理哪些内容,以试图保持集群中的负载平衡。作为用户,您永远不应该注意到该过程。

假设您的 Storm 集群设置正确,您唯一需要做的就是提交拓扑。 Storm 会为您处理整个多节点并行处理的事情。

就是说,您拥有的 AtomicInteger 会表现得很奇怪,因为风暴将您的代码分割到多个服务器,甚至单个主机上的多个 JVM。如果您想解决单个风暴进程需要了解较大集群状态的情况,最好将其外部化到某种独立的数据存储(即 redis 或 hbase)。