如何在风暴拓扑中使用流口水
how to use drools in storm topology
现在想在blot中使用Drools,在LocalCluster上正常,但是放到生产集群上就出错了。
印迹是:
public class DealLostBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");
private OutputCollector collector;
private KieSession kieSession;
private FactHandle factHandle;
@Override
public void execute(Tuple input) {
// 获取数据
String sentence = (String) input.getValue(0);
LOGGER.info("DealLostBolt获取到的数据:" + sentence);
// 数据转换
PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
KieServices ks = KieServices.Factory.get();
KieContainer kieContainer = ks.getKieClasspathContainer();
kieSession = kieContainer.newKieSession("all-rule");
kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
factHandle = kieSession.insert(dataPoint);
kieSession.fireAllRules();
kieSession.delete(factHandle);
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("value"));
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
}
我是用官方文档创建的kiesession。
错误是:
java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.NullPointerException
at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
... 6 more
也许有些东西没有初始化。
但是我在 blot 执行时创建了一个新的 kieservice。
有人可以帮帮我吗?
谢谢!
我在使用 Drools 和 JMH as shaded jar. Drools uses a ServiceRegistry 方法时遇到了类似的问题。这意味着 Drools 库(drools-compiler、kie-ci、drools-decisiontables,...)包含同名的 属性 文件,指示接口的实现他们提供。
阴影 jar 插件通常将(传递的)依赖项ci扁平化到一个 jar 中。对于多次存在的文件,这通常意味着如果没有另外指定,则选择其中一个。对于 ServiceRegistry 属性,我们需要合并所有文件。通常这是通过 ServicesResourceTransformer. This Transformer handles files in META-INF/services/
, but the relevant file for Drools is META-INF/kie.conf
. My problem with JMH could be solved with the AppendingTransformer:
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/kie.conf</resource>
</transformer>
我不是 Storm 的专家,但 Starter 表明它也使用了 shade 插件。我假设您 运行 来自 IDE 的本地集群 - 它不使用阴影 jar。
现在想在blot中使用Drools,在LocalCluster上正常,但是放到生产集群上就出错了。 印迹是:
public class DealLostBolt extends BaseRichBolt {
private static final long serialVersionUID = 1L;
private static final Logger LOGGER = LoggerFactory.getLogger("DEAL_LOST_BOLT");
private OutputCollector collector;
private KieSession kieSession;
private FactHandle factHandle;
@Override
public void execute(Tuple input) {
// 获取数据
String sentence = (String) input.getValue(0);
LOGGER.info("DealLostBolt获取到的数据:" + sentence);
// 数据转换
PutDataPoint dataPoint = Json.fromJson(PutDataPoint.class, sentence);
KieServices ks = KieServices.Factory.get();
KieContainer kieContainer = ks.getKieClasspathContainer();
kieSession = kieContainer.newKieSession("all-rule");
kieSession.getAgenda().getAgendaGroup("deal-lost").setFocus();
factHandle = kieSession.insert(dataPoint);
kieSession.fireAllRules();
kieSession.delete(factHandle);
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("value"));
}
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
}
我是用官方文档创建的kiesession。 错误是:
java.lang.RuntimeException: java.lang.NullPointerException
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:495) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:460) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$fn__5030$fn__5043$fn__5096.invoke(executor.clj:848) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.1.1.jar:1.1.1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
Caused by: java.lang.NullPointerException
at org.kie.internal.io.ResourceFactory.newByteArrayResource(ResourceFactory.java:66) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.getResource(AbstractKieModule.java:299) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:264) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.addResourceToCompiler(AbstractKieModule.java:259) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieProject.buildKnowledgePackages(AbstractKieProject.java:228) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.AbstractKieModule.createKieBase(AbstractKieModule.java:206) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.createKieBase(KieContainerImpl.java:584) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.getKieBase(KieContainerImpl.java:552) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:680) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.drools.compiler.kie.builder.impl.KieContainerImpl.newKieSession(KieContainerImpl.java:648) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at cn.ennwifi.storm.bolt.DealLostBolt.execute(DealLostBolt.java:52) ~[se-storm-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.storm.daemon.executor$fn__5030$tuple_action_fn__5032.invoke(executor.clj:729) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4951.invoke(executor.clj:461) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.disruptor$clojure_handler$reify__4465.onEvent(disruptor.clj:40) ~[storm-core-1.1.1.jar:1.1.1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:482) ~[storm-core-1.1.1.jar:1.1.1]
... 6 more
也许有些东西没有初始化。 但是我在 blot 执行时创建了一个新的 kieservice。 有人可以帮帮我吗?
谢谢!
我在使用 Drools 和 JMH as shaded jar. Drools uses a ServiceRegistry 方法时遇到了类似的问题。这意味着 Drools 库(drools-compiler、kie-ci、drools-decisiontables,...)包含同名的 属性 文件,指示接口的实现他们提供。
阴影 jar 插件通常将(传递的)依赖项ci扁平化到一个 jar 中。对于多次存在的文件,这通常意味着如果没有另外指定,则选择其中一个。对于 ServiceRegistry 属性,我们需要合并所有文件。通常这是通过 ServicesResourceTransformer. This Transformer handles files in META-INF/services/
, but the relevant file for Drools is META-INF/kie.conf
. My problem with JMH could be solved with the AppendingTransformer:
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/kie.conf</resource>
</transformer>
我不是 Storm 的专家,但 Starter 表明它也使用了 shade 插件。我假设您 运行 来自 IDE 的本地集群 - 它不使用阴影 jar。