从 Apache Storm Bolt 打印
Print from Apache Storm Bolt
我正在研究一些 Storm 拓扑和螺栓的示例代码,但我 运行 遇到了一些奇怪的事情。我的目标是使用 Storm 设置 Kafka,以便 Storm 可以处理 Kafka 总线上可用的消息。我定义了以下螺栓:
public class ReportBolt extends BaseRichBolt {
private static final long serialVersionUID = 6102304822420418016L;
private Map<String, Long> counts;
private OutputCollector collector;
@Override @SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector outCollector) {
collector = outCollector;
counts = new HashMap<String, Long>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// terminal bolt = does not emit anything
}
@Override
public void execute(Tuple tuple) {
System.out.println("HELLO " + tuple);
}
@Override
public void cleanup() {
System.out.println("HELLO FINAL");
}
}
本质上,它应该只是输出每条Kafka消息;当调用清理函数时,应该会出现一条不同的消息。
我查看了 worker 日志,找到了最终消息(即 "HELLO FINAL"),但是找不到带有 "HELLO" 的 Kafka 消息。据我所知,这应该是一个简单的打印机螺栓,但我看不出哪里出错了。工作日志表明我已连接到 Kafka 总线(它获取偏移量等)。
简而言之,为什么我的 println
没有出现在工作日志中?
编辑
public class AckedTopology {
private static final String SPOUT_ID = "monitoring_test_spout";
private static final String REPORT_BOLT_ID = "acking-report-bolt";
private static final String TOPOLOGY_NAME = "monitoring-topology";
public static void main(String[] args) throws Exception {
int numSpoutExecutors = 1;
KafkaSpout kspout = buildKafkaSpout();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
builder.setBolt(REPORT_BOLT_ID, reportBolt);
Config cfg = new Config();
StormSubmitter.submitTopology(TOPOLOGY_NAME, cfg, builder.createTopology());
}
private static KafkaSpout buildKafkaSpout() {
String zkHostPort = "URL";
String topic = "TOPIC";
String zkRoot = "/brokers";
String zkSpoutId = "monitoring_test_spout_id";
ZkHosts zkHosts = new ZkHosts(zkHostPort);
SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
return kafkaSpout;
}
}
您的螺栓未与喷口连接。你需要使用风暴的分组才能做到这一点..使用这样的东西
builder.setBolt(REPORT_BOLT_ID, reportBolt).shuffleGrouping(SPOUT_ID);
setBolt
通常 returns 一个 InputDeclarer 对象。在您的情况下,通过指定 shuffleGrouping(SPOUT_ID)
您告诉风暴您有兴趣使用具有 id REPORT_BOLT_ID
.
的组件发出的所有元组
在 stream groupings 上阅读更多内容并根据您的需要选择一个。
我正在研究一些 Storm 拓扑和螺栓的示例代码,但我 运行 遇到了一些奇怪的事情。我的目标是使用 Storm 设置 Kafka,以便 Storm 可以处理 Kafka 总线上可用的消息。我定义了以下螺栓:
public class ReportBolt extends BaseRichBolt {
private static final long serialVersionUID = 6102304822420418016L;
private Map<String, Long> counts;
private OutputCollector collector;
@Override @SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector outCollector) {
collector = outCollector;
counts = new HashMap<String, Long>();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// terminal bolt = does not emit anything
}
@Override
public void execute(Tuple tuple) {
System.out.println("HELLO " + tuple);
}
@Override
public void cleanup() {
System.out.println("HELLO FINAL");
}
}
本质上,它应该只是输出每条Kafka消息;当调用清理函数时,应该会出现一条不同的消息。
我查看了 worker 日志,找到了最终消息(即 "HELLO FINAL"),但是找不到带有 "HELLO" 的 Kafka 消息。据我所知,这应该是一个简单的打印机螺栓,但我看不出哪里出错了。工作日志表明我已连接到 Kafka 总线(它获取偏移量等)。
简而言之,为什么我的 println
没有出现在工作日志中?
编辑
public class AckedTopology {
private static final String SPOUT_ID = "monitoring_test_spout";
private static final String REPORT_BOLT_ID = "acking-report-bolt";
private static final String TOPOLOGY_NAME = "monitoring-topology";
public static void main(String[] args) throws Exception {
int numSpoutExecutors = 1;
KafkaSpout kspout = buildKafkaSpout();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, kspout, numSpoutExecutors);
builder.setBolt(REPORT_BOLT_ID, reportBolt);
Config cfg = new Config();
StormSubmitter.submitTopology(TOPOLOGY_NAME, cfg, builder.createTopology());
}
private static KafkaSpout buildKafkaSpout() {
String zkHostPort = "URL";
String topic = "TOPIC";
String zkRoot = "/brokers";
String zkSpoutId = "monitoring_test_spout_id";
ZkHosts zkHosts = new ZkHosts(zkHostPort);
SpoutConfig spoutCfg = new SpoutConfig(zkHosts, topic, zkRoot, zkSpoutId);
KafkaSpout kafkaSpout = new KafkaSpout(spoutCfg);
return kafkaSpout;
}
}
您的螺栓未与喷口连接。你需要使用风暴的分组才能做到这一点..使用这样的东西
builder.setBolt(REPORT_BOLT_ID, reportBolt).shuffleGrouping(SPOUT_ID);
setBolt
通常 returns 一个 InputDeclarer 对象。在您的情况下,通过指定 shuffleGrouping(SPOUT_ID)
您告诉风暴您有兴趣使用具有 id REPORT_BOLT_ID
.
在 stream groupings 上阅读更多内容并根据您的需要选择一个。