使用 Goroutine 订阅 MQTT 消息
Subscribing to MQTT messages using a Goroutine
我目前有一个 Go 代码可以订阅和打印发布到特定主题的传感器数据。这是我的代码:
package main
import (
"crypto/tls"
"flag"
"fmt"
//"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
//fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())
fmt.Printf("%s\n", message.Payload())
}
func main() {
//MQTT.DEBUG = log.New(os.Stdout, "", 0)
//MQTT.ERROR = log.New(os.Stdout, "", 0)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
hostname, _ := os.Hostname()
server := flag.String("server", "tcp://test.mosquitto.org:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883")
topic := flag.String("topic", "topic/sensorTemperature", "Topic to subscribe to")
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
username := flag.String("username", "", "A username to authenticate to the MQTT server")
password := flag.String("password", "", "Password to match username")
flag.Parse()
connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true)
if *username != "" {
connOpts.SetUsername(*username)
if *password != "" {
connOpts.SetPassword(*password)
}
}
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
connOpts.SetTLSConfig(tlsConfig)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to %s\n", *server)
}
<-c
}
我不想像这样订阅消息,而是想将订阅的代码部分放在 Goroutine 中。我希望能够打电话给 go func onMessageReceived
。如果在 c.Subscribe
中调用此函数,我该怎么做?以及如何添加 sync.WaitGroup
参数?谢谢。
由于您将函数作为参数传递给另一个函数,因此您无法控制它的调用方式。但是,您确实可以完全控制函数内部发生的事情——这意味着您可以在那里启动一个 goroutine:
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
go func() {
fmt.Printf("%s\n", message.Payload())
}()
}
因此,onMessageReceived
本身仍会被 MQTT 同步调用,但它只会启动一个 goroutine 并立即 return。您还可以定义一个单独的函数并使用 go
而不是匿名函数来调用它:
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
go messageHandler(client, message)
}
func messageHandler(client MQTT.Client, message MQTT.Message) {
fmt.Printf("%s\n", message.Payload())
}
这只是您希望如何组织代码的问题。如果它是一个简短的处理程序,我可能会坚持使用匿名函数(足够短,您可以在一个屏幕上看到整个匿名函数);对于更长的函数,我会把它分解或分解成一个命名函数。
因为你不能传入任何额外的参数,如果你想使用一个WaitGroup
,它必须是全局的:
var wg = new(sync.WaitGroup)
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("%s\n", message.Payload())
}()
}
我目前有一个 Go 代码可以订阅和打印发布到特定主题的传感器数据。这是我的代码:
package main
import (
"crypto/tls"
"flag"
"fmt"
//"log"
"os"
"os/signal"
"strconv"
"syscall"
"time"
MQTT "github.com/eclipse/paho.mqtt.golang"
)
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
//fmt.Printf("Received message on topic: %s\nMessage: %s\n", message.Topic(), message.Payload())
fmt.Printf("%s\n", message.Payload())
}
func main() {
//MQTT.DEBUG = log.New(os.Stdout, "", 0)
//MQTT.ERROR = log.New(os.Stdout, "", 0)
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
hostname, _ := os.Hostname()
server := flag.String("server", "tcp://test.mosquitto.org:1883", "The full url of the MQTT server to connect to ex: tcp://127.0.0.1:1883")
topic := flag.String("topic", "topic/sensorTemperature", "Topic to subscribe to")
qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
clientid := flag.String("clientid", hostname+strconv.Itoa(time.Now().Second()), "A clientid for the connection")
username := flag.String("username", "", "A username to authenticate to the MQTT server")
password := flag.String("password", "", "Password to match username")
flag.Parse()
connOpts := MQTT.NewClientOptions().AddBroker(*server).SetClientID(*clientid).SetCleanSession(true)
if *username != "" {
connOpts.SetUsername(*username)
if *password != "" {
connOpts.SetPassword(*password)
}
}
tlsConfig := &tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}
connOpts.SetTLSConfig(tlsConfig)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
} else {
fmt.Printf("Connected to %s\n", *server)
}
<-c
}
我不想像这样订阅消息,而是想将订阅的代码部分放在 Goroutine 中。我希望能够打电话给 go func onMessageReceived
。如果在 c.Subscribe
中调用此函数,我该怎么做?以及如何添加 sync.WaitGroup
参数?谢谢。
由于您将函数作为参数传递给另一个函数,因此您无法控制它的调用方式。但是,您确实可以完全控制函数内部发生的事情——这意味着您可以在那里启动一个 goroutine:
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
go func() {
fmt.Printf("%s\n", message.Payload())
}()
}
因此,onMessageReceived
本身仍会被 MQTT 同步调用,但它只会启动一个 goroutine 并立即 return。您还可以定义一个单独的函数并使用 go
而不是匿名函数来调用它:
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
go messageHandler(client, message)
}
func messageHandler(client MQTT.Client, message MQTT.Message) {
fmt.Printf("%s\n", message.Payload())
}
这只是您希望如何组织代码的问题。如果它是一个简短的处理程序,我可能会坚持使用匿名函数(足够短,您可以在一个屏幕上看到整个匿名函数);对于更长的函数,我会把它分解或分解成一个命名函数。
因为你不能传入任何额外的参数,如果你想使用一个WaitGroup
,它必须是全局的:
var wg = new(sync.WaitGroup)
func onMessageReceived(client MQTT.Client, message MQTT.Message) {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Printf("%s\n", message.Payload())
}()
}