Storm Bolt 不是 printing/logging Kafka Spout
Storm Bolt not printing/logging Kafka Spout
编辑:我向 Bolt 添加了一个 .ack() (这要求我使用 Rich Bolt 而不是基本的 Bolt)并且我遇到了同样的问题 - 没有任何信息告诉我元组正在被 Bolt 处理.
如果重要的话,我会运行在 EC2 实例上的 CentOS 映像上安装它。任何帮助将不胜感激。
我正在尝试设置一个非常基本的 HelloWorld Storm 示例来读取来自 Kafka 集群的消息和 print/log 我收到的消息。
目前我在Kafka集群中有20条消息。当我 运行 拓扑(似乎开始时很好)时,我能够看到我的 Kafka Spout 和 Echo Bolt。在 Storm UI 中,Kafka Spout Acked
列的值为 20 - 我假设这是它能够 read/access (?)
但是,Echo Bolt 行仅指出我有 1 个执行程序和 1 个任务。所有其他列均为 0。
查看生成的 Storm worker 日志,我看到了这一行:Read partition information from: /HelloWorld Spout/partition_0 --> {"topic":"helloworld","partition":0,"topology":{"id":"<UUID>","name":"Kafka-Storm test"},"broker":{"port":6667,"host":"ip-10-0-0-35.ec2.internal"},"offset":20}
接下来几行如下:
s.k.PartitionManager [INFO] Last commit offset from zookeeper: 0
s.k.PartitionManager [INFO] Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
s.k.PartitionManager [INFO] Starting Kafka ip-10-0-0-35.ec2.internal:0 from offset 0
s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=ip-10-0-0-35.ec2.internal:6667}}
工作日志的其余部分显示没有 log/print 由 Bolt 处理的消息。我不知道为什么 Bolt 似乎没有从 Kafka 集群收到任何消息。任何帮助都会很棒。谢谢。
构建 KafkaSpout
private static KafkaSpout setupSpout() {
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "helloworld", "", "HelloWorld Spout");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = true;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
return new KafkaSpout(spoutConfig);
}
构建拓扑并提交
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Kafka Spout", setupSpout());
builder.setBolt("Echo Bolt", new SystemOutEchoBolt());
try {
System.setProperty("storm.jar", "/tmp/storm.jar");
StormSubmitter.submitTopology("Kafka-Storm test", new Config(), builder.createTopology());
} //catchExceptionsHere
}
螺栓
public class SystemOutEchoBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);
private OutputCollector m_collector;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map _map, TopologyContext _conetxt, OutputCollector _collector) {
m_collector = _collector;
}
@Override
public void execute(Tuple _tuple) {
System.out.println("Printing tuple with toString(): " + _tuple.toString());
System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
logger.info("Logging tuple with logger: " + _tuple.getString(0));
m_collector.ack(_tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer _declarer) {}
}
您需要在 bolts 中对元组调用 ack 或 fail,否则 spout 不知道元组已被完全处理。这将导致您看到的计数问题。
public class SystemOutEchoBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);
@Override
public void execute(Tuple _tuple, BasicOutputCollector _collector) {
System.out.println("Printing tuple with toString(): " + _tuple.toString());
System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
logger.info("Logging tuple with logger: " + _tuple.getString(0));
_collector.ack(_tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {}
}
}
答案很简单。我从来没有告诉螺栓要订阅哪个流。添加 .shuffleGrouping("Kafka Spout");
解决了问题。
编辑:我向 Bolt 添加了一个 .ack() (这要求我使用 Rich Bolt 而不是基本的 Bolt)并且我遇到了同样的问题 - 没有任何信息告诉我元组正在被 Bolt 处理.
如果重要的话,我会运行在 EC2 实例上的 CentOS 映像上安装它。任何帮助将不胜感激。
我正在尝试设置一个非常基本的 HelloWorld Storm 示例来读取来自 Kafka 集群的消息和 print/log 我收到的消息。
目前我在Kafka集群中有20条消息。当我 运行 拓扑(似乎开始时很好)时,我能够看到我的 Kafka Spout 和 Echo Bolt。在 Storm UI 中,Kafka Spout Acked
列的值为 20 - 我假设这是它能够 read/access (?)
但是,Echo Bolt 行仅指出我有 1 个执行程序和 1 个任务。所有其他列均为 0。
查看生成的 Storm worker 日志,我看到了这一行:Read partition information from: /HelloWorld Spout/partition_0 --> {"topic":"helloworld","partition":0,"topology":{"id":"<UUID>","name":"Kafka-Storm test"},"broker":{"port":6667,"host":"ip-10-0-0-35.ec2.internal"},"offset":20}
接下来几行如下:
s.k.PartitionManager [INFO] Last commit offset from zookeeper: 0
s.k.PartitionManager [INFO] Commit offset 0 is more than 9223372036854775807 behind, resetting to startOffsetTime=-2
s.k.PartitionManager [INFO] Starting Kafka ip-10-0-0-35.ec2.internal:0 from offset 0
s.k.ZkCoordinator [INFO] Task [1/1] Finished refreshing
s.k.ZkCoordinator [INFO] Task [1/1] Refreshing partition manager connections
s.k.DynamicBrokersReader [INFO] Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=ip-10-0-0-35.ec2.internal:6667}}
工作日志的其余部分显示没有 log/print 由 Bolt 处理的消息。我不知道为什么 Bolt 似乎没有从 Kafka 集群收到任何消息。任何帮助都会很棒。谢谢。
构建 KafkaSpout
private static KafkaSpout setupSpout() {
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "helloworld", "", "HelloWorld Spout");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.forceFromStart = true;
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
return new KafkaSpout(spoutConfig);
}
构建拓扑并提交
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("Kafka Spout", setupSpout());
builder.setBolt("Echo Bolt", new SystemOutEchoBolt());
try {
System.setProperty("storm.jar", "/tmp/storm.jar");
StormSubmitter.submitTopology("Kafka-Storm test", new Config(), builder.createTopology());
} //catchExceptionsHere
}
螺栓
public class SystemOutEchoBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);
private OutputCollector m_collector;
@SuppressWarnings("rawtypes")
@Override
public void prepare(Map _map, TopologyContext _conetxt, OutputCollector _collector) {
m_collector = _collector;
}
@Override
public void execute(Tuple _tuple) {
System.out.println("Printing tuple with toString(): " + _tuple.toString());
System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
logger.info("Logging tuple with logger: " + _tuple.getString(0));
m_collector.ack(_tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer _declarer) {}
}
您需要在 bolts 中对元组调用 ack 或 fail,否则 spout 不知道元组已被完全处理。这将导致您看到的计数问题。
public class SystemOutEchoBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(SystemOutEchoBolt.class);
@Override
public void execute(Tuple _tuple, BasicOutputCollector _collector) {
System.out.println("Printing tuple with toString(): " + _tuple.toString());
System.out.println("Printing tuple with getString(): " + _tuple.getString(0));
logger.info("Logging tuple with logger: " + _tuple.getString(0));
_collector.ack(_tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {}
}
}
答案很简单。我从来没有告诉螺栓要订阅哪个流。添加 .shuffleGrouping("Kafka Spout");
解决了问题。