apache 风暴,负载平衡,json
apache storm, load balance, json
我正在使用kafka storm,kafka sends/emits json string来进行storm,在storm中,我想根据key/field中的key/field将负载分配给几个worker json。怎么做?在我的例子中,它是 json 字符串中的 groupid 字段。
例如,我有 json 这样的:
{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234 to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}
我也尝试使用以下代码:
1. TopologyBuilder builder = new TopologyBuilder();
2. builder.setSpout(SPOUTNAME, kafkaSpout, 1);
3. builder.setBolt(MYDISTRIBUTEDWORKER, new DistributedBolt()).setFieldsGroup(SPOUTNAME,new Fields("groupid")); <---???
我想知道如何在第 3 行的 setFieldsGroup 方法中添加参数。有人可以给我提示吗?
朱哈尼
==使用 storm 0.9.4 进行测试 ============
=============源代码==============
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class KafkaBoltMain {
private static final String SPOUTNAME="TopicSpout";
private static final String ANALYSISBOLT = "AnalysisWorker";
private static final String CLIENTID = "Storm";
private static final String TOPOLOGYNAME = "LocalTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private OutputCollector _collector;
private long groupid=-1L;
private String log="test";
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
List<Object> objs = tuple.getValues();
int i=0;
for(Object obj:objs){
System.out.println(""+i+"th object's value is:"+obj.toString());
i++;
}
// _collector.emit(new Values(groupid,log));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("groupid","log"));
}
}
public static void main(String[] args){
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else if(args.length == 1 && args[0].equalsIgnoreCase("help")){
System.out.println("xxxx");
System.exit(0);
}
else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("xxxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
.fieldsGrouping(SPOUTNAME,new Fields("groupid"));
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
}
}
============================================= =====
当我开始提交拓扑(本地集群)时,它
给出以下错误:
11658 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x14d097d338c0009 with negotiated timeout 20000 for client /127.0.0.1:34656
11658 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x14d097d338c0009, negotiated timeout = 20000
11659 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
12670 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id ccc57de0-29ff-4cb4-89de-fea1ea9b6e28 at host storm-VirtualBox
12794 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component: [AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
at backtype.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:178) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.common$system_topology_BANG_.invoke(common.clj:307) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submitTopologyWithOpts(nimbus.clj:948) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submitTopology(nimbus.clj:966) ~[storm-core-0.9.4.jar:0.9.4]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_80]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_80]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_80]
at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_80]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.5.1.jar:na]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.5.1.jar:na]
at backtype.storm.testing$submit_local_topology.invoke(testing.clj:264) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.4.jar:0.9.4]
at com.callstats.stream.analyzer.KafkaBoltMain.main(KafkaBoltMain.java:94) ~[StreamAnalyzer-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
不知道你用的是哪个版本的Storm,从0.9.4开始,你的需求可以实现如下。
builder.setBolt(MYDISTRIBUTEDWORKER, new DistributedBolt()).fieldsGrouping(SPOUTNAME, new Fields("groupid"));
在DistributedBolt的prepare方法中,
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("groupid", "log"));
}
在它的execute方法的某处,你会调用
collector.emit(new Values(groupid, log));
然后具有相同 groupid 的元组将被传递到下一个螺栓的相同实例。
我正在使用kafka storm,kafka sends/emits json string来进行storm,在storm中,我想根据key/field中的key/field将负载分配给几个worker json。怎么做?在我的例子中,它是 json 字符串中的 groupid 字段。
例如,我有 json 这样的:
{groupid: 1234, userid: 145, comments:"I want to distribute all this group 1234 to one worker", size:50,type:"group json"}
{groupid: 1235, userid: 134, comments:"I want to distribute all this group 1234 to another worker", size:90,type:"group json"}
{groupid: 1234, userid: 158, comments:"I want to be sent to same worker as group 1234", size:50,type:"group json"}
我也尝试使用以下代码:
1. TopologyBuilder builder = new TopologyBuilder();
2. builder.setSpout(SPOUTNAME, kafkaSpout, 1);
3. builder.setBolt(MYDISTRIBUTEDWORKER, new DistributedBolt()).setFieldsGroup(SPOUTNAME,new Fields("groupid")); <---???
我想知道如何在第 3 行的 setFieldsGroup 方法中添加参数。有人可以给我提示吗?
朱哈尼
==使用 storm 0.9.4 进行测试 ============
=============源代码==============
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class KafkaBoltMain {
private static final String SPOUTNAME="TopicSpout";
private static final String ANALYSISBOLT = "AnalysisWorker";
private static final String CLIENTID = "Storm";
private static final String TOPOLOGYNAME = "LocalTopology";
private static class AppAnalysisBolt extends BaseRichBolt {
private static final long serialVersionUID = -6885792881303198646L;
private OutputCollector _collector;
private long groupid=-1L;
private String log="test";
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
List<Object> objs = tuple.getValues();
int i=0;
for(Object obj:objs){
System.out.println(""+i+"th object's value is:"+obj.toString());
i++;
}
// _collector.emit(new Values(groupid,log));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("groupid","log"));
}
}
public static void main(String[] args){
String zookeepers = null;
String topicName = null;
if(args.length == 2 ){
zookeepers = args[0];
topicName = args[1];
}else if(args.length == 1 && args[0].equalsIgnoreCase("help")){
System.out.println("xxxx");
System.exit(0);
}
else{
System.out.println("You need to have two arguments: kafka zookeeper:port and topic name");
System.out.println("xxxx");
System.exit(-1);
}
SpoutConfig spoutConfig = new SpoutConfig(new ZkHosts(zookeepers),
topicName,
"",// zookeeper root path for offset storing
CLIENTID);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUTNAME, kafkaSpout, 1);
builder.setBolt(ANALYSISBOLT, new AppAnalysisBolt(),2)
.fieldsGrouping(SPOUTNAME,new Fields("groupid"));
//Configuration
Config conf = new Config();
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGYNAME, conf, builder.createTopology());
}
}
============================================= ===== 当我开始提交拓扑(本地集群)时,它 给出以下错误:
11658 [SyncThread:0] INFO org.apache.storm.zookeeper.server.ZooKeeperServer - Established session 0x14d097d338c0009 with negotiated timeout 20000 for client /127.0.0.1:34656
11658 [main-SendThread(localhost:2000)] INFO org.apache.storm.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x14d097d338c0009, negotiated timeout = 20000
11659 [main-EventThread] INFO org.apache.storm.curator.framework.state.ConnectionStateManager - State change: CONNECTED
12670 [main] INFO backtype.storm.daemon.supervisor - Starting supervisor with id ccc57de0-29ff-4cb4-89de-fea1ea9b6e28 at host storm-VirtualBox
12794 [main] WARN backtype.storm.daemon.nimbus - Topology submission exception. (topology name='LocalTopology') #<InvalidTopologyException InvalidTopologyException(msg:Component: [AnalysisWorker] subscribes from stream: [default] of component [TopicSpout] with non-existent fields: #{"groupid"})>
12800 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null
at backtype.storm.daemon.common$validate_structure_BANG_.invoke(common.clj:178) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.common$system_topology_BANG_.invoke(common.clj:307) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submitTopologyWithOpts(nimbus.clj:948) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.daemon.nimbus$fn__4290$exec_fn__1754__auto__$reify__4303.submitTopology(nimbus.clj:966) ~[storm-core-0.9.4.jar:0.9.4]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_80]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_80]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_80]
at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_80]
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93) ~[clojure-1.5.1.jar:na]
at clojure.lang.Reflector.invokeInstanceMethod(Reflector.java:28) ~[clojure-1.5.1.jar:na]
at backtype.storm.testing$submit_local_topology.invoke(testing.clj:264) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.LocalCluster.submitTopology(Unknown Source) ~[storm-core-0.9.4.jar:0.9.4]
at com.callstats.stream.analyzer.KafkaBoltMain.main(KafkaBoltMain.java:94) ~[StreamAnalyzer-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
不知道你用的是哪个版本的Storm,从0.9.4开始,你的需求可以实现如下。
builder.setBolt(MYDISTRIBUTEDWORKER, new DistributedBolt()).fieldsGrouping(SPOUTNAME, new Fields("groupid"));
在DistributedBolt的prepare方法中,
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("groupid", "log"));
}
在它的execute方法的某处,你会调用
collector.emit(new Values(groupid, log));
然后具有相同 groupid 的元组将被传递到下一个螺栓的相同实例。