Mqtt:在服务器端持久化消息
Mqtt: Persist message on server side
我们决定在我们的移动应用程序中使用 mqtt 协议作为聊天模块。我也想在服务器端保存主题消息。但我看到,mqtt 客户端在这里是全局的,所以一种方法是我必须将 mqtt 客户端的单个实例订阅到所有主题并将消息保存在数据库中。但这样做是正确的方法吗?我只是担心它。
private void buildClient(){
log.debug("Connecting... "+CLIENT_ID);
try {
mqttClient = new MqttClient(envConfiguration.getBrokerUrl(), CLIENT_ID);
} catch (MqttException e) {
log.debug("build client stopped due to "+e.getCause());
}
chatCallback = new ChatCallback();
mqttClient.setCallback(chatCallback);
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(false);
}
@Override
public void connect() {
if(mqttClient == null || !mqttClient.getClientId().equals(CLIENT_ID)){
buildClient();
}
boolean tryConnecting = true;
while(tryConnecting){
try {
mqttClient.connect(mqttConnectOptions);
} catch (Exception e) {
log.debug("connection attempt failed "+ e.getCause() + " trying...");
}
if(mqttClient.isConnected()){
tryConnecting = false;
}else{
pause();
}
}
}
@Override
public void publish() {
boolean publishCallCompletedErrorFree = false;
while (!publishCallCompletedErrorFree) {
try {
mqttClient.publish(TOPIC, "hello".getBytes(), 1, true);
publishCallCompletedErrorFree = true;
} catch (Exception e) {
log.debug("error occured while publishing "+e.getCause());
}finally{
pause();
}
}
}
@Override
public void subscribe() {
if(mqttClient != null && mqttClient.isConnected()){
try {
mqttClient.subscribe(TOPIC, 2);
} catch (MqttException e) {
log.debug("subscribing error.."+e.getCause());
}
}
}
@Override
public void disconnect() {
System.out.println(this.mqttClient.isConnected());
try {
mqttClient.disconnect();
log.debug("disconnected..");
} catch (MqttException e) {
log.debug("erro occured while disconneting.."+e.getCause());
}
}
解决这个问题有两种可能:
- 编写一个使用通配符(MQTT 中的#)订阅所有主题的 MQTT 客户端
- 编写一个代理插件来为您完成这项工作,具体取决于您使用的代理实施
HiveMQ website 中很好地描述了如何实施这两个选项,还描述了第一个选项的限制。
我们决定在我们的移动应用程序中使用 mqtt 协议作为聊天模块。我也想在服务器端保存主题消息。但我看到,mqtt 客户端在这里是全局的,所以一种方法是我必须将 mqtt 客户端的单个实例订阅到所有主题并将消息保存在数据库中。但这样做是正确的方法吗?我只是担心它。
private void buildClient(){
log.debug("Connecting... "+CLIENT_ID);
try {
mqttClient = new MqttClient(envConfiguration.getBrokerUrl(), CLIENT_ID);
} catch (MqttException e) {
log.debug("build client stopped due to "+e.getCause());
}
chatCallback = new ChatCallback();
mqttClient.setCallback(chatCallback);
mqttConnectOptions = new MqttConnectOptions();
mqttConnectOptions.setCleanSession(false);
}
@Override
public void connect() {
if(mqttClient == null || !mqttClient.getClientId().equals(CLIENT_ID)){
buildClient();
}
boolean tryConnecting = true;
while(tryConnecting){
try {
mqttClient.connect(mqttConnectOptions);
} catch (Exception e) {
log.debug("connection attempt failed "+ e.getCause() + " trying...");
}
if(mqttClient.isConnected()){
tryConnecting = false;
}else{
pause();
}
}
}
@Override
public void publish() {
boolean publishCallCompletedErrorFree = false;
while (!publishCallCompletedErrorFree) {
try {
mqttClient.publish(TOPIC, "hello".getBytes(), 1, true);
publishCallCompletedErrorFree = true;
} catch (Exception e) {
log.debug("error occured while publishing "+e.getCause());
}finally{
pause();
}
}
}
@Override
public void subscribe() {
if(mqttClient != null && mqttClient.isConnected()){
try {
mqttClient.subscribe(TOPIC, 2);
} catch (MqttException e) {
log.debug("subscribing error.."+e.getCause());
}
}
}
@Override
public void disconnect() {
System.out.println(this.mqttClient.isConnected());
try {
mqttClient.disconnect();
log.debug("disconnected..");
} catch (MqttException e) {
log.debug("erro occured while disconneting.."+e.getCause());
}
}
解决这个问题有两种可能:
- 编写一个使用通配符(MQTT 中的#)订阅所有主题的 MQTT 客户端
- 编写一个代理插件来为您完成这项工作,具体取决于您使用的代理实施
HiveMQ website 中很好地描述了如何实施这两个选项,还描述了第一个选项的限制。