MQTT 连接到 ActiveMQ TOPIC 在 WSO2 CEP 的事件接收器中花费大量时间
MQTT connect to ActiveMQ TOPIC take lots of time in Event Receiver of WSO2 CEP
更新
在carbon-analytics-common项目的MQTTAdapterListener源码中是这样的link
run()
将使线程休眠,我相信这就是 MQTT 连接需要这么长时间的原因。
@Override
public void run() {
while (!connectionSucceeded) {
try {
MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration
* MQTTEventAdapterConstants.reconnectionProgressionFactor;
Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration);
startListener();
connectionSucceeded = true;
log.info("MQTT Connection successful");
} catch (InterruptedException e) {
log.error("Interruption occurred while waiting for reconnection", e);
} catch (MqttException e) {
log.error("MQTT Exception occurred when starting listener", e);
}
}
}
initialReconnectDuration和reconnectionProgressionFactor如下MQTTEventAdapterConstants
public static int initialReconnectDuration = 10000;
public static final int reconnectionProgressionFactor = 2;
如果我有 12 个带有 MQTT 的接收器,第 12 个将休眠 40960 秒。好像没有办法修改这两个常量?有什么办法可以解决这个问题吗?为什么MQTT连接会这样设置为休眠线程?
我们使用的是 WSO2 CEP 版本 4.0.0 和 ActiveMQ 版本 5.13.0。部署的事件接收器如下
<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER"
statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="mqtt">
<property name="clientId">5718a6b1851cb3474c6f03c2</property>
<property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property>
<property name="cleanSession">false</property>
<property name="url">tcp://127.0.0.1:1883</property>
</from>
<mapping customMapping="enable" type="json">
<property>
<from jsonPath="$.event.payloadData.dyna.Speed3"/>
<to name="speed" type="double"/>
</property>
</mapping>
<to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/>
</eventReceiver>
clientId是我们数据库内部这个事件执行计划的一个随机对象Id。该主题存在于 ActiveMQ 中,我确信消息已排入其中。清洁会话 属性 已针对持久订阅设置为 false。
但是,无论我们重新启动 WSO2 CEP 还是部署新的接收器,接收器总是需要花费很多时间来连接到 ActiveMQ TOPIC。
[2016-04-24 01:07:59,018] INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} - Starting polling event receivers
[2016-04-24 01:07:59,019] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,020] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,021] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,025] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER
[2016-04-24 01:08:19,044] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:08:39,041] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:09:19,034] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:10:39,037] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:13:19,046] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:18:39,035] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:29:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:50:39,036] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 02:33:19,039] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 03:58:39,043] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 06:49:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 12:30:39,040] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
有12个接收者,WSO2在01:07:59开始轮询。大约10分钟后,12个接收器中只有6个连接成功,其他的大约需要10个小时或更长时间。
有谁知道为什么WSO2 CEP接收器的MQTT连接这么慢?
没有提供开箱即用的方法来配置重新连接持续时间和累进因子值。但是由于您已经弄清楚了代码,您可以修补该组件(从配置文件中读取这些值或通过适配器 ui 配置它或只是硬编码值)并将修补程序应用为 described here 来克服这个限制。
更新
在carbon-analytics-common项目的MQTTAdapterListener源码中是这样的link
run()
将使线程休眠,我相信这就是 MQTT 连接需要这么长时间的原因。
@Override
public void run() {
while (!connectionSucceeded) {
try {
MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration
* MQTTEventAdapterConstants.reconnectionProgressionFactor;
Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration);
startListener();
connectionSucceeded = true;
log.info("MQTT Connection successful");
} catch (InterruptedException e) {
log.error("Interruption occurred while waiting for reconnection", e);
} catch (MqttException e) {
log.error("MQTT Exception occurred when starting listener", e);
}
}
}
initialReconnectDuration和reconnectionProgressionFactor如下MQTTEventAdapterConstants
public static int initialReconnectDuration = 10000;
public static final int reconnectionProgressionFactor = 2;
如果我有 12 个带有 MQTT 的接收器,第 12 个将休眠 40960 秒。好像没有办法修改这两个常量?有什么办法可以解决这个问题吗?为什么MQTT连接会这样设置为休眠线程?
我们使用的是 WSO2 CEP 版本 4.0.0 和 ActiveMQ 版本 5.13.0。部署的事件接收器如下
<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER"
statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
<from eventAdapterType="mqtt">
<property name="clientId">5718a6b1851cb3474c6f03c2</property>
<property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property>
<property name="cleanSession">false</property>
<property name="url">tcp://127.0.0.1:1883</property>
</from>
<mapping customMapping="enable" type="json">
<property>
<from jsonPath="$.event.payloadData.dyna.Speed3"/>
<to name="speed" type="double"/>
</property>
</mapping>
<to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/>
</eventReceiver>
clientId是我们数据库内部这个事件执行计划的一个随机对象Id。该主题存在于 ActiveMQ 中,我确信消息已排入其中。清洁会话 属性 已针对持久订阅设置为 false。
但是,无论我们重新启动 WSO2 CEP 还是部署新的接收器,接收器总是需要花费很多时间来连接到 ActiveMQ TOPIC。
[2016-04-24 01:07:59,018] INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} - Starting polling event receivers
[2016-04-24 01:07:59,019] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,020] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,021] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,022] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,023] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,025] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,026] INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} - Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER
[2016-04-24 01:08:19,044] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:08:39,041] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:09:19,034] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:10:39,037] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:13:19,046] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:18:39,035] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:29:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 01:50:39,036] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 02:33:19,039] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 03:58:39,043] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 06:49:19,038] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
[2016-04-24 12:30:39,040] INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} - MQTT Connection successful
有12个接收者,WSO2在01:07:59开始轮询。大约10分钟后,12个接收器中只有6个连接成功,其他的大约需要10个小时或更长时间。
有谁知道为什么WSO2 CEP接收器的MQTT连接这么慢?
没有提供开箱即用的方法来配置重新连接持续时间和累进因子值。但是由于您已经弄清楚了代码,您可以修补该组件(从配置文件中读取这些值或通过适配器 ui 配置它或只是硬编码值)并将修补程序应用为 described here 来克服这个限制。