基于 websocket 的 Mqtt 后端

Mqtt back end over websocket

我们在 back-end 中有一个 Mqtt 代理,它被设置为通过 WebSocket 接受连接。当我直接连接到这台机器时一切正常。但是我们想通过 Enterprise integrator 路由所有呼叫。我似乎无法正常工作。后端 websocket 监听路径,而不是根(例如:machine:9999/api/mqtt)。出于测试目的,back-end Mqtt 代理没有实施安全措施。它接受每一个连接,并且都可以订阅和发布。

更改为 axis.xml

<transportReceiver name="mqtt" class="org.apache.axis2.transport.mqtt.MqttListener">
    <parameter locked="false" name="mqttConFactory">
        <parameter locked="false" name="mqtt.server.host.name">workerv2</parameter>
        <parameter name="mqtt.connection.factory">mqttConFactory</parameter>
        <parameter locked="false" name="mqtt.server.port">9026</parameter>
        <parameter locked="false" name="mqtt.client.id">client-id-1234</parameter>
        <parameter locked="false" name="mqtt.topic.name">esb.test</parameter>
    </parameter>
</transportReceiver>

<transportSender name="ws" class="org.wso2.carbon.websocket.transport.WebsocketTransportSender">
    <parameter name="ws.outflow.dispatch.sequence" locked="false">outflowDispatchSeq</parameter>
    <parameter name="ws.outflow.dispatch.fault.sequence" locked="false">outflowFaultSeq</parameter>
</transportSender>

我已经将 jar 文件复制到 libs 文件夹中:org.exlipse.paho.client.mqttv3-1.1.0.jar

我已经按照示例设置了我的序列和 websocket 入站端点 here

我不知道如何解决这个问题,我将 log4j 设置为显示线路日志,但似乎无法从中获取任何信息。我没有看到任何错误,所以我想我必须关闭 (?)

你能帮帮我吗?

编辑

我设法设置了 websocket 到 websocket 的连接。直接连接时,websocket 工作,我可以成功连接。但是当我通过 wso2ei 连接时,我可以看到日志消息并且一切正常。但是所有请求都在 120 秒后超时。我似乎无法为 websockets 调试 headers。任何帮助,将不胜感激。

根据示例我的调度顺序:

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="dispatchSeq" xmlns="http://ws.apache.org/ns/synapse">
    <log level="full">
        <property name="LOGGED_MESSAGE" value="LOGGED"/>
    </log>
    <send>
        <endpoint>
            <address uri="ws://10.100.14.8:9026/api/v2/mqtt"/>
        </endpoint>
    </send>
</sequence>

我的输出序列

<?xml version="1.0" encoding="UTF-8"?>
<sequence name="outDispatchSeq" trace="enable" xmlns="http://ws.apache.org/ns/synapse">
    <log level="full"/>
    <respond/>
</sequence>

我的入站端点

<?xml version="1.0" encoding="UTF-8"?><inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="testMQTT" sequence="dispatchSeq" onError="fault" protocol="ws" suspend="false">
<parameters>
    <parameter name="inbound.ws.port">9091</parameter>
    <parameter name="ws.client.side.broadcast.level">0</parameter>
    <parameter name="ws.outflow.dispatch.sequence">outDispatchSeq</parameter>
    <parameter name="ws.outflow.dispatch.fault.sequence">fault</parameter>
    <parameter name="ws.use.port.offset">false</parameter>
</parameters>

记录消息

TID: [-1] [] [2018-04-24 14:25:14,675]  INFO {org.apache.synapse.mediators.builtin.LogMediator} -  To: , MessageID: urn:uuid:decebbbe-8ad7-4174-adb9-1bc324764275, Direction: request, LOGGED_MESSAGE = LOGGED, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"><soapenv:Body/></soapenv:Envelope> {org.apache.synapse.mediators.builtin.LogMediator}
TID: [-1] [] [2018-04-24 14:25:16,177]  WARN {org.apache.synapse.core.axis2.TimeoutHandler} -  Expiring message ID : urn:uuid:26b39fa8-9b5e-44cd-86b3-caa72c30047b; dropping message after GLOBAL_TIMEOUT of : 120 seconds for AnonymousEndpoint, URI : ws://10.100.14.8:9026/api/v2/mqtt, Received through Inbound Endpoint : testMQTT {org.apache.synapse.core.axis2.TimeoutHandler}

我解决了这个问题,websocket 直接连接而不需要 ebs 集成器。