MQTT 是如何为多模块增长的?

Paho MQTT golang for multiple modules?

我正在用 golang 为 mqtt 模块编写微服务。该模块将同时被不同的功能使用。我使用 Grpc 作为传输层。 我做了一个连接功能,就是这样..

func Connect() { //it would be Connect(payload1 struct,topic string)

    deviceID := flag.String("device", "handler-1", "GCP Device-Id")
    bridge := struct {
        host *string
        port *string
    }{
        flag.String("mqtt_host", "", "MQTT Bridge Host"),
        flag.String("mqtt_port", "", "MQTT Bridge Port"),
    }
    projectID := flag.String("project", "", "GCP Project ID")
    registryID := flag.String("registry", "", "Cloud IoT Registry ID (short form)")
    region := flag.String("region", "", "GCP Region")
    certsCA := flag.String("ca_certs", "", "Download https://pki.google.com/roots.pem")
    privateKey := flag.String("private_key", "", "Path to private key file")

    server := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
    topic := struct {
        config    string
        telemetry string
    }{
        config:    fmt.Sprintf("/devices/%v/config", *deviceID),
        telemetry: fmt.Sprintf("/devices/%v/events/topic", *deviceID),
    }
    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := fmt.Sprintf("projects/%v/locations/%v/registries/%v/devices/%v",
        *projectID,
        *region,
        *registryID,
        *deviceID,
    )
    log.Println("[main] Loading Google's roots")
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(*certsCA)
    if err == nil {
        certpool.AppendCertsFromPEM(pemCerts)
    }

    log.Println("[main] Creating TLS Config")
    config := &tls.Config{
        RootCAs:            certpool,
        ClientAuth:         tls.NoClientCert,
        ClientCAs:          nil,
        InsecureSkipVerify: true,
        Certificates:       []tls.Certificate{},
        MinVersion:         tls.VersionTLS12,
    }

    flag.Parse()

    connOpts := MQTT.NewClientOptions().
        AddBroker(server).
        SetClientID(clientid).
        SetAutoReconnect(true).
        SetPingTimeout(10 * time.Second).
        SetKeepAlive(10 * time.Second).
        SetDefaultPublishHandler(onMessageReceived).
        SetConnectionLostHandler(connLostHandler).
        SetReconnectingHandler(reconnHandler).
        SetTLSConfig(config)
    connOpts.SetUsername("unused")
    ///JWT Generation Starts from Here
    token := jwt.New(jwt.SigningMethodES256)
    token.Claims = jwt.StandardClaims{
        Audience:  *projectID,
        IssuedAt:  time.Now().Unix(),
        ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
    }
    //Reading key file
    log.Println("[main] Load Private Key")
    keyBytes, err := ioutil.ReadFile(*privateKey)
    if err != nil {
        log.Fatal(err)
    }
    //Parsing key from file
    log.Println("[main] Parse Private Key")
    key, err := jwt.ParseECPrivateKeyFromPEM(keyBytes)
    if err != nil {
        log.Fatal(err)
    }
    //Signing JWT with private key
    log.Println("[main] Sign String")
    tokenString, err := token.SignedString(key)
    if err != nil {
        log.Fatal(err)
    }
    //JWT Generation Ends here

    connOpts.SetPassword(tokenString)
    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic.config, byte(*qos), nil); token.Wait() && token.Error() != nil {
            log.Fatal(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("Not Connected..Retrying...  %s\n", server)
    } else {
        fmt.Printf("Connected to %s\n", server)
    }

}

我在 main.go

的 go 例程中调用这个函数
func main() {
    fmt.Println("Server started at port 5005")
    lis, err := net.Listen("tcp", "0.0.0.0:5005")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    //Creating keepAlive channel for mqttt subscribe
    keepAlive := make(chan os.Signal)
    defer close(keepAlive)
    go func() {
        //checking for internet connection
        for !IsOnline() {
            fmt.Println("No Internet Connection..Retrying")
            //looking for internet connection after every 8 seconds
            time.Sleep(8 * time.Second)
        }
        fmt.Println("Internet connected...connecting to mqtt broker")
        repositories.Connect()
        //looking for interupt(Ctrl+C)
        value := <-keepAlive
        //If Ctrl+C is pressed then exit the application
        if value == os.Interrupt {
            fmt.Printf("Exiting the application")
            os.Exit(3)
        }
    }()
    s := grpc.NewServer()
    MqttRepository := repositories.MqttRepository()
    // It creates a new gRPC server instance
    rpc.NewMqttServer(s, MqttRepository)
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)

    }
}

func IsOnline() bool {
    timeout := time.Duration(5000 * time.Millisecond)
    client := http.Client{
        Timeout: timeout,
    }
    //default url to check connection is http://google.com
    _, err := client.Get("https://google.com")

    if err != nil {
        return false
    }

    return true
}

我在 main 中使用 go 例程,以便应用程序在每次启动时启动。

现在我想使用这个MQTT Connect功能来发布来自其他不同功能的数据。

例如函数 A 可以像 Connect(payload1,topic1) 一样调用它,函数 B 可以像 Connect(payload2,topic2) 一样调用它,然后这个函数应该处理将数据发布到云中。

我是否应该只在这个 Connect 函数中添加主题和负载,然后从另一个函数调用它?或者有没有可能我可以 return 或将客户端导出为全局的,然后在另一个函数或 go routine 中使用它?如果我的问题听起来很愚蠢,我很抱歉..我不是golang专家..

Now I want to use this MQTT Connect function to publish the data from other different functions.

我怀疑我可能误解了您在这里尝试做的事情,但除非您有建立多个连接的特定原因,否则最好连接一次,然后使用该单一连接发布多条消息。每次发送消息时建立连接都会出现一些问题,包括:

  • 建立连接需要时间并会产生一些网络流量(TLS 握手等)。
  • 给定的 ClientID 只能有一个活动连接(如果您建立第二个连接,代理将关闭前一个连接)。
  • 库不会自动断开连接 - 您需要在发布后调用 Disconnect
  • 由于连接断开,传入消息可能会丢失(请注意 CleanSession 默认为 true)。

Should I just add the topic and payload in this Connect function and then call it from another function?

如上所述,首选方法是连接一次,然后通过一个连接发布多条消息。 Client 设计为线程安全的,因此您可以传递它并从多个 go 例程调用 Publish。您也可以使用 AutoConnect option (which you are) if you want the library to manage the connection (there is also a SetConnectRetry 函数)但请记住,如果 link 在您尝试发送时出现故障,则不会重试 QOS 0 消息。

我建议你的连接函数 return 客户端(即 func Connect() mqtt.Client),然后使用该客户端发布消息(你可以将它存储在某个地方或者只是传递它;我会建议将其添加到您的 grpc 服务器结构中)。

我想如果您需要连接到特定的 clientid 以便发送到所需的主题,您可能需要建立多个连接(但通常您会给您的服务器连接访问权限广泛的主题)。这将需要一些工作来确保您不会尝试同时使用相同的客户端 ID 建立多个连接,并且根据您的要求接收传入的消息。

一些补充说明:

  • 如果您使用 AutoConnect and SetConnectRetry you can simplify your code code (and just use IsConnectionOpen() 检查连接是否正常,则不需要 IsOnline())。
  • spec states“服务器必须允许长度在 1 到 23 个 UTF-8 编码字节之间的 ClientId”- 看起来你的比那个长(我没有使用 GCP,它可能support/require 更长的客户端 ID)。
  • 您在生产中应该不需要 InsecureSkipVerify