如何订阅 MQTT 主题并在 Eclipse 上打印收到的消息 (Java)

How to subscribe to a MQTT topic and print received messages on Eclipse (Java)

我有一个带恒温器的微控制器,使用 MQTT 协议通过 Raspberry Pi 将其数据发送到我的计算机。 Kura 已安装并在 Raspberry 上运行。

我在 Putty 上接收数据没有问题,但现在我需要在 Eclipse 上接收它以便我可以开发程序。

我使用 Paho 通过 eclipse 使用以下代码成功发布了该主题(这是对另一个主题 Subscribe and Read MQTT Message Using PAHO 的改编):

package publish;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class PublishSemInterface {
MqttClient client;

public PublishSemInterface() {}

public static void main(String[] args) {
    new PublishSemInterface().doDemo();
}

public void doDemo() {
    try {
        client = new MqttClient("tcp://192.168.0.39:1883", "user");
        client.connect();
        MqttMessage message = new MqttMessage();
        message.setPayload("Published message".getBytes());
        client.publish("sensor/temp/out", message);
        client.disconnect();
    } catch (MqttException e) {
        e.printStackTrace();
    }
}
}

但是订阅很痛苦。我尝试使用上面提到的主题的答案,实现 MqttCallback 接口:

public class PublishSemInterface implements MqttCallback

连接客户端后添加setCallback和需要的接口方法(我只需要messageArrived):

client.setCallback(this);

@Override
public void connectionLost(Throwable cause) {}

@Override
public void messageArrived(String topic, MqttMessage message)
    throws Exception {
System.out.println(message);   
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {}

但是没有用。我还尝试使用以下主题的答案:How to read data from MQTT in Eclipse Paho?

public static void main(String[] args) {

    MqttClient client;
    MqttConnectOptions conn;

    try {
        client = new MqttClient("tcp://192.168.0.39:1883", "user");
        client.connect();
        client.setCallback(new MqttCallback() {
            public void connectionLost(Throwable cause) {}

            public void messageArrived(String topic,
                    MqttMessage message)
                            throws Exception {
                System.out.println(message.toString());
            }

            public void deliveryComplete(IMqttDeliveryToken token) {}
        });

        client.subscribe("sensor/temp/in");

    } catch (MqttException e) {
        e.printStackTrace();
    }
}

除了它也不起作用。在这两种情况下,当我 运行 代码时,控制台处于活动状态,但是当微控制器发送数据(出现在 Putty 上)而不是打印数据时,程序终止。看起来好像没有调用 messageArrived 方法。

谁能帮我在 Eclipse 的控制台上订阅和打印?

如您所见:client.publish("sensor/temp/out", message);,您的主题是sensor/temp/out。所以你的订阅者应该订阅相同的主题,而不是这一行:client.subscribe("sensor/temp/in");,尝试订阅主题:sensor/temp/out.

另外,我建议您使用其他 mqtt 选项创建连接。像这样:

MqttClient client = new MqttClient(serverUrl, UUID.randomUUID().toString().replace("-", "")); //clientID needs to be unique and has meaning only for mqtt broker
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("username"); //part of the password_file inside mqtt broker
options.setPassword("password".toCharArray()); //also part of password_file. Username and password might not be needed.
options.setConnectionTimeout(60);
options.setKeepAliveInterval(60); //how often to send PINGREQ messages
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //newest version
client.connect(options);

我已经设法让正在发送的数据出现在 Eclipse 控制台上。看来 ClientId 是错误的,但我还根据我在问题中链接的主题的答案添加了一些修改。这是代码:

private Map<String, Object> properties;

public void updated(Map<String, Object> properties) {
  this.properties = properties;
  String broker   = "";
  String clientId = "";
  String topic   = "";

  if(properties != null && !properties.isEmpty()) {

    broker = (String) properties.get("broker.name");
    clientId = (String) properties.get("clientId.name");
    topic = (String) properties.get("topic.name");

    doDemo(broker, clientId, topic);
  }
}

public void doDemo(String broker, String clientId, String topic) {
  MemoryPersistence persistence = new MemoryPersistence();

  try {
    MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
    MqttConnectOptions connOpts = new MqttConnectOptions();
    connOpts.setCleanSession(true);

    sampleClient.setCallback(new MqttCallback() {
      public void connectionLost(Throwable cause) {}

      public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Message: " + message.toString());
      }

      public void deliveryComplete(IMqttDeliveryToken token) {}
    });

    sampleClient.connect(connOpts);
    sampleClient.subscribe(topic);

  } catch(MqttException e) {
    e.printStackTrace();
  }
}