在 Apache Storm bolt 中使用 Apache Camel ProducerTemplate
Using Apache Camel ProducerTemplate in Apache Storm bolt
我正在尝试编写简单的 Storm + Camel 项目。
我的 Storm 拓扑分析推文,一个螺栓应该将推文文本发送到 apache camel 路由,后者又使用 websocket 通知一些 webapp。
由于在尝试使用 build once CamelContext 时从螺栓收到 NotSerializableExceptions,我无法使其工作。
我已经尝试过的:
- 在 bolt 的构造函数中传递 CamelContext - 导致 NotSerializableException
在 storm conf 中传递 CamelContext,并在 bolt 的 prepare(...) 方法中使用它来获取它。结果:
14484 [main] 错误 org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] 已死
java.lang.IllegalArgumentException: Topology conf 不是 json-serializable
在 backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4]
在 backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4]
在 backtype.storm.LocalCluster.submitTopology(未知来源)~[storm-core-0.9.4.jar:0.9.4]
骆驼路线:
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:main")
.to("websocket:localhost:8085/main?sendToAll=true");
}
}
风暴拓扑:
Tweet Spout 正在使用 twitter4j stremaing 传播推文 API.
public class TwitterStreamTopology {
public static void main(String[] args) {
CamelContext producerTemplate = new RouteStarter().buildRoute();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
Config conf = new Config();
conf.put("producerTemplate", producerTemplate.createProducerTemplate());
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(20000);
cluster.shutdown();
}
}
WebsocketBolt:
public class WebSocketBolt extends BaseBasicBolt {
private ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
}
}
有没有办法很好地做到这一点?
或者我应该让 camel 路由被 http 访问,并在 bolt prepare(...) 方法中创建一些 HttpClient 吗?这看起来仍然有点矫枉过正,必须有一种方法让它变得更容易。
感谢大家的帮助!
你的问题的根本原因是你将 ProducerTemplate 添加到你的 storm 配置中并且它抛出异常,因为它不可序列化。如果那是你自己的 class,你可以更改代码以使其工作,但由于那是 Camel class 我会推荐一种不同的方法。
- WebSocketBolt:将您的 producerTemplate 私有成员更改为瞬态:
private transient ProducerTemplate producerTemplate;
这样它就不会尝试被序列化(与将它放入 conf 时遇到的问题相同)。
- WebSocketBolt:在准备方法中而不是在拓扑中初始化 producerTemplate。
像这样:
public class WebSocketBolt extends BaseBasicBolt {
private transient ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
CamelContext producerTemplate = new RouteStarter().buildRoute();
this.producerTemplate = producerTemplate.createProducerTemplate();
}
}
我正在尝试编写简单的 Storm + Camel 项目。 我的 Storm 拓扑分析推文,一个螺栓应该将推文文本发送到 apache camel 路由,后者又使用 websocket 通知一些 webapp。
由于在尝试使用 build once CamelContext 时从螺栓收到 NotSerializableExceptions,我无法使其工作。
我已经尝试过的:
- 在 bolt 的构造函数中传递 CamelContext - 导致 NotSerializableException
在 storm conf 中传递 CamelContext,并在 bolt 的 prepare(...) 方法中使用它来获取它。结果:
14484 [main] 错误 org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] 已死 java.lang.IllegalArgumentException: Topology conf 不是 json-serializable 在 backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4] 在 backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4] 在 backtype.storm.LocalCluster.submitTopology(未知来源)~[storm-core-0.9.4.jar:0.9.4]
骆驼路线:
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:main")
.to("websocket:localhost:8085/main?sendToAll=true");
}
}
风暴拓扑: Tweet Spout 正在使用 twitter4j stremaing 传播推文 API.
public class TwitterStreamTopology {
public static void main(String[] args) {
CamelContext producerTemplate = new RouteStarter().buildRoute();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
Config conf = new Config();
conf.put("producerTemplate", producerTemplate.createProducerTemplate());
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(20000);
cluster.shutdown();
}
}
WebsocketBolt:
public class WebSocketBolt extends BaseBasicBolt {
private ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
}
}
有没有办法很好地做到这一点?
或者我应该让 camel 路由被 http 访问,并在 bolt prepare(...) 方法中创建一些 HttpClient 吗?这看起来仍然有点矫枉过正,必须有一种方法让它变得更容易。
感谢大家的帮助!
你的问题的根本原因是你将 ProducerTemplate 添加到你的 storm 配置中并且它抛出异常,因为它不可序列化。如果那是你自己的 class,你可以更改代码以使其工作,但由于那是 Camel class 我会推荐一种不同的方法。
- WebSocketBolt:将您的 producerTemplate 私有成员更改为瞬态:
private transient ProducerTemplate producerTemplate;
这样它就不会尝试被序列化(与将它放入 conf 时遇到的问题相同)。 - WebSocketBolt:在准备方法中而不是在拓扑中初始化 producerTemplate。
像这样:
public class WebSocketBolt extends BaseBasicBolt {
private transient ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
CamelContext producerTemplate = new RouteStarter().buildRoute();
this.producerTemplate = producerTemplate.createProducerTemplate();
}
}