向 nats 组发出请求
make a request to a nats group
这是我的 sub.go 示例:
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
nc, _ := nats.Connect("127.0.0.1:4222")
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
c.QueueSubscribe("subject_toto", "queue_titi", func(_, reply string, message *Message) {
fmt.Printf("%+v\n", message)
var response Response
response.Msg = "message received"
response.Status = "Ok"
c.Publish("reply", response)
c.Flush()
})
c.Flush()
}
func main() {
fmt.Println("begin")
go start()
defer c.Close()
fmt.Scanln()
nc.Drain()
// Close connection
nc.Close()
fmt.Println("done")
}
它运行完美,所以现在我想向这个队列发布一条消息,这是我的 pub.go:
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
"time"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
var err error
var message Message
var response Response
nc, _ := nats.Connect("127.0.0.1:4222")
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
message.Status = "Ok"
message.Msg = "hello"
err = c.Request("subject_toto", message, &response, 6*time.Second)
if err != nil {
fmt.Printf("%+v\n", err)
}
fmt.Printf("%+v\n","response")
fmt.Printf("%+v\n", response)
defer c.Close()
}
func main() {
fmt.Println("begin")
start()
fmt.Println("done")
}
但是当我尝试发布到它时,我得到一个空的响应:
response {Status: Msg:}
答案的开头之一似乎使用 PublishRequest,但我似乎只能将字符串发送到服务器,而不是结构。
这是 sub 的功能版本:
package main
import (
"encoding/json"
"fmt"
nats "github.com/nats-io/nats.go"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
nc, _ := nats.Connect("127.0.0.1:4222")
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
c.QueueSubscribe("subject_toto", "queue_titi",func(msg *nats.Msg) {
var message Message
err := json.Unmarshal([]byte(msg.Data), &message)
if err != nil {
fmt.Printf("%+v\n", err)
}
fmt.Printf("%+v\n", "message from pub")
fmt.Printf("%+v\n", message)
var response Response
response.Msg = "message received"
response.Status = "Ok"
fmt.Printf("%+v\n", "response to sub")
fmt.Printf("%+v\n", response)
byteConfApi, err2 := json.Marshal(response)
if err2 != nil {
fmt.Printf("%+v\n", err2)
}
msg.Respond(byteConfApi)
c.Flush()
})
}
func main(){
fmt.Println("begin")
go start()
defer c.Close()
fmt.Scanln()
nc.Drain()
// Close connection
nc.Close()
fmt.Println("done")
}
这一行有问题:
c.Publish("reply", response)
当您发送请求并且您的订阅者将要回复时,应该有一个称为收件箱的“回复主题”。此收件箱在处理函数的 reply
arg 中设置。
因此您必须将您的回复发布到 QueueSubscribe()
中处理函数的 reply
arg 返回的主题中,因此将该行更改为:
c.Publish(reply, response)
reply arg 的值对于通信很重要,是这样的:_INBOX.bw5EtJShBTI9OQdvxFOBlz.VxsGBcjH
这是我的 sub.go 示例:
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
nc, _ := nats.Connect("127.0.0.1:4222")
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
c.QueueSubscribe("subject_toto", "queue_titi", func(_, reply string, message *Message) {
fmt.Printf("%+v\n", message)
var response Response
response.Msg = "message received"
response.Status = "Ok"
c.Publish("reply", response)
c.Flush()
})
c.Flush()
}
func main() {
fmt.Println("begin")
go start()
defer c.Close()
fmt.Scanln()
nc.Drain()
// Close connection
nc.Close()
fmt.Println("done")
}
它运行完美,所以现在我想向这个队列发布一条消息,这是我的 pub.go:
package main
import (
"fmt"
nats "github.com/nats-io/nats.go"
"time"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
var err error
var message Message
var response Response
nc, _ := nats.Connect("127.0.0.1:4222")
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
message.Status = "Ok"
message.Msg = "hello"
err = c.Request("subject_toto", message, &response, 6*time.Second)
if err != nil {
fmt.Printf("%+v\n", err)
}
fmt.Printf("%+v\n","response")
fmt.Printf("%+v\n", response)
defer c.Close()
}
func main() {
fmt.Println("begin")
start()
fmt.Println("done")
}
但是当我尝试发布到它时,我得到一个空的响应:
response {Status: Msg:}
答案的开头之一似乎使用 PublishRequest,但我似乎只能将字符串发送到服务器,而不是结构。
这是 sub 的功能版本:
package main
import (
"encoding/json"
"fmt"
nats "github.com/nats-io/nats.go"
)
type Message struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
type Response struct {
Status string `json:"Status"`
Msg string `json:"Msg"`
}
var nc *nats.Conn
var c *nats.EncodedConn
func start(){
nc, _ := nats.Connect("127.0.0.1:4222")
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
c.QueueSubscribe("subject_toto", "queue_titi",func(msg *nats.Msg) {
var message Message
err := json.Unmarshal([]byte(msg.Data), &message)
if err != nil {
fmt.Printf("%+v\n", err)
}
fmt.Printf("%+v\n", "message from pub")
fmt.Printf("%+v\n", message)
var response Response
response.Msg = "message received"
response.Status = "Ok"
fmt.Printf("%+v\n", "response to sub")
fmt.Printf("%+v\n", response)
byteConfApi, err2 := json.Marshal(response)
if err2 != nil {
fmt.Printf("%+v\n", err2)
}
msg.Respond(byteConfApi)
c.Flush()
})
}
func main(){
fmt.Println("begin")
go start()
defer c.Close()
fmt.Scanln()
nc.Drain()
// Close connection
nc.Close()
fmt.Println("done")
}
这一行有问题:
c.Publish("reply", response)
当您发送请求并且您的订阅者将要回复时,应该有一个称为收件箱的“回复主题”。此收件箱在处理函数的 reply
arg 中设置。
因此您必须将您的回复发布到 QueueSubscribe()
中处理函数的 reply
arg 返回的主题中,因此将该行更改为:
c.Publish(reply, response)
reply arg 的值对于通信很重要,是这样的:_INBOX.bw5EtJShBTI9OQdvxFOBlz.VxsGBcjH