HiveMQ Java 阻止客户端订阅者未使用任何消息

HiveMQ Java Blocking Client Subscriber is not consuming any message

我正在使用 HiveMQ Java 客户端连接到 HiveMQ 代理。 Blocking Client Subscriber 不使用任何消息。使用 MQTTBox 发布和订阅工作正常。这是代码。我正在关注 HiveMQ Java Client Documentation

public class MQTTMain {
public static void main(String[] args) {
Mqtt3BlockingClient pubClient = MqttClient.builder()
                                          .useMqttVersion3()
                                          .identifier("pub")
                                          .serverHost("hostaddress")
                                          .serverPort(1883)
                                          .buildBlocking();
 Mqtt3BlockingClient subClient = MqttClient.builder()
                                           .useMqttVersion3()
                                           .identifier("sub")
                                           .serverHost("hostaddress")
                                           .serverPort(1883)
                                           .buildBlocking();

    pubClient.connectWith().keepAlive(10000).send();
    publish(pubClient, "test/topic", "test");
    subClient.connectWith().keepAlive(10000).send();
    subscribe(subClient, "test/topic");
    while (true) {
    }
}

public static void subscribe(Mqtt3BlockingClient client, String topic) {
    try (final Mqtt3Publishes publishes = 
              client.publishes(MqttGlobalPublishFilter.ALL)) {
        try {
             publishes.receive(1, TimeUnit.SECONDS)
            .ifPresent(System.out::println);
            publishes.receive(10000, TimeUnit.MILLISECONDS)
            .ifPresent(System.out::println);
            } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } catch (Exception e) {
        // TODO: handle exception
    }

   client
  .subscribeWith()
  .topicFilter(topic)
  .qos(MqttQos.AT_LEAST_ONCE)
  .send();
}

public static void publish(Mqtt3BlockingClient client, String topic, 
String payload) {
   client
  `enter code here`.publishWith()
  .topic(topic)
  .qos(MqttQos.AT_LEAST_ONCE)
  .payload(payload.getBytes())
  .send();
}
}

Maven 依赖项:

<!-- MQTT Client -->      
<dependency>
        <groupId>com.hivemq</groupId>
        <artifactId>hivemq-mqtt-client</artifactId>
        <version>1.0.0</version>
</dependency> 

我是不是漏掉了什么?任何指针都会非常有用。

您必须先订阅才能接收消息。

Publishes.receive是一个阻塞操作,所以它一直等到收到消息。 您正在接收操作后订阅。 你应该这样做:

Mqtt3BlockingClient client = Mqtt3Client.builder()
        .identifier("sub")
        .serverHost("hostaddress")
        .serverPort(1883)
        .buildBlocking();

client.connect();

try (final Mqtt3BlockingClient.Mqtt3Publishes publishes = 
             client.publishes(MqttGlobalPublishFilter.ALL)) {

    client.subscribeWith().topicFilter("test/topic").qos(MqttQos.AT_LEAST_ONCE).send();

    publishes.receive(1, TimeUnit.SECONDS).ifPresent(System.out::println);
}