Paho MQTT客户端本地地址和端口

Paho MQTT client local address and port

我的问题很简单。 我想知道如何修复端口客户端。

根据 Eclipse 文档和 IBM 的文档,用户必须修复代理地址(这是绝对自然的)。但是没有提及如何修复客户端站点端口的方法。 https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.5.0/com.ibm.mq.javadoc.doc/WMQMQxrClasses/com/ibm/micro/client/mqttv3/MqttClient.html

MQTT肯定也在TCP层,所以我觉得可以修复端口。

如果你有想法,请告诉我。 谢谢

不要指定客户端的端口,因为它是由OS选择的,就像任何客户端到服务器的连接一样,就像您也不需要通过 HTTP 连接那样做。

您以tcp://<address>:<port>的形式在URL中指定MQTT代理的IP地址和端口,例如tcp://localhost:4922

下面的代码显示了我如何在 OSGi 上下文中连接 Paho 客户端,其中所有连接参数都是从捆绑上下文中检索的。

private void configureMqtt(BundleContext context) throws IOException, MqttException {
    String broker = context.getProperty("mqtt.broker");

    if (broker == null) {
        throw new ServiceException("Define mqtt.broker");
    }

    String client = context.getProperty("mqtt.clientname");

    if (client == null) {
        throw new ServiceException("Define mqtt.clientname");
    }

    String dir = context.getProperty("mqtt.persistence");

    if (dir == null) {
        dir = "mqtt";
    };

    File directory = context.getDataFile(dir);
    directory.mkdirs();

    logger.config(() -> String.format("MQTT broker: %s, clientname: %s persistence: %s", broker, client, directory.getAbsolutePath()));
    connectOptions = new MqttConnectOptions();
    connectOptions.setWill(GARAGE + "/door", new byte[0], 0, true);

    String ka = context.getProperty("mqtt.keepalive");
    if (ka != null) {
        connectOptions.setKeepAliveInterval(Integer.valueOf(ka));
    }

    persistence = new MqttDefaultFilePersistence(directory.getCanonicalPath());
    mqttClient = new MqttAsyncClient(broker, client, persistence);
    mqttClient.setCallback(this);
    connect();
}

private void connect() {
    logger.fine("Connecting to MQTT broker");

    try {
        IMqttToken token = mqttClient.connect(connectOptions);

        IMqttActionListener listener = new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                logger.log(Level.INFO, "Connected to MQTT broker");
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                logger.log(Level.WARNING, "Could not connect to MQTT broker, retrying in 3 seconds", exception);
                service.schedule(this::connect, 3, TimeUnit.SECONDS);
            }
        };

        token.setActionCallback(listener);
    } catch (MqttException e) {
        logger.log(Level.SEVERE, "Cannot reconnect to MQTT broker, giving up", e);
    }
}

正常情况下你不设置TCP连接的源端口,你只是让OS选择一个随机的空闲端口。

如果您修复了源端口,那么在给定的机器上您一次只能 运行 1 个客户端实例,如果连接失败,您必须等待 TCP 堆栈释放在您可以重新连接之前,请再次打开该插座。

如果出于某种原因你真的需要修复源端口那么你可能会写一个自定义的javax.net.SocketFactory implementation that you can hard code the source port. Then pass this in as part of the MQTTConnectOptions对象,但我又一次真的很难来说出这是个好主意的原因。