在 WSO2 CEP 3.1.0 中处理 MQTT 消息

Handling MQTT messages in WSO2 CEP 3.1.0

我有一个场景,其中我有 MQTT 发布者。

MQTT Client --> Message Broker --> WSO2 CEP (JMS input adaptor)

我可以通过哪些方式将已发布的 MQTT 消息作为 WSO2 CEP 的输入?

我尝试使用 ActiveMQ 作为消息代理和 CEP JMS 输入适配器来侦听 JMS 消息,但是 ActiveMQ 将 MQTT 消息转换为 ByteMessage,我相信 CEP 目前还没有处理它,因为 JMS 消息在相同的设置中发布工作正常。
我知道 WSO2 CEP 4.0.0 具有 MQTT 输入适配器的功能。我如何在 CEP 3.1.0 中处理它?

正如您提到的,WSO2 CEP 从 4.0.0 版本开始支持 MQTT。您无法将 WSO2 CEP 4.0.0 MQTT components/features 添加到 CEP 3.1.0,因为输入适配器架构在新版本中有所不同。

我强烈推荐使用 CEP 4.0.0。如果你真的想要在 CEP 3.1.0 中使用它,你将不得不 write a custom event adapters for CEP 3.1.0. You can use refer new version source 来了解一些想法,但适配器实现是不同的。

是的,我们尚未在 CEP 3.1.0 中处理 JMS 输入事件适配器的 ByteMessage 场景。但是,我们已在 CEP 4.0.0(以及随 BAM 2.5.0 发布的 CEP 功能)中修复此问题

我想提供一些选项以使其与 CEP 3.1.0 一起使用。正如 Tharik 提到的,MQTT 输入事件适配器功能可从下一个 CEP 版本 (4.0.0) 获得。以下是可能的选项。

1) 您可以修补 JMS 输入事件适配器并将其作为 CEP 服务器的修补程序包含在内。 (修复在 [1] 中可用)

2) 使用 BAM 2.5.0 中可用的 MQTT 输入事件适配器 jar。 (构建源 [2] 并将 jar 复制到 CEP 的 dropins 文件夹,然后按照 [3])

3) 按照 [4]

中的说明编写您自己的自定义 MQTT 输入事件适配器

[1] https://svn.wso2.org/repos/wso2/carbon/platform/branches/turing/components/event-stream/event-input-adaptor/org.wso2.carbon.event.input.adaptor.jms/1.0.2/src/main/java/org/wso2/carbon/event/input/adaptor/jms/internal/util/JMSMessageListener.java

[2]https://svn.wso2.org/repos/wso2/carbon/platform/branches/turing/components/event-stream/event-input-adaptor/org.wso2.carbon.event.input.adaptor.mqtt/1.0.0

[3]https://docs.wso2.com/display/BAM250/Input+MQTT+Event+Adapter

[4]https://mohanadarshan.wordpress.com/2014/07/14/writing-custom-event-adaptors-in-wso2-cep-3-1-0/

谢谢..