NATS async reply to request 不是异步的
NATS async reply to request is not asynchronnous
我正在尝试使用 GO 语言在 gnatsd 中实现 request/response 功能,我意识到 gnatsd 不会以异步方式回复请求。
我开始使用 NATS github 示例 https://github.com/nats-io/go-nats/tree/master/examples - 示例 nats-req.go 和 nats-rply.go 进行调查。这些例子效果很好。
然后我简单地修改了它们以测试 gnatsd 上的并行请求,并提供一些调试信息,其中处理异步回复的 goroutine ID。
有修改示例源码
nats-rply.go 已被修改为简单地 return 传入请求的返回文本,其中包含有关当前 goroutine ID 的信息。我还在异步处理函数中添加了 1 秒睡眠来模拟一些处理时间。
package main
import (
"fmt"
"github.com/nats-io/go-nats"
"flag"
"log"
"runtime"
"time"
"bytes"
"strconv"
)
// NOTE: Use tls scheme for TLS, e.g. nats-rply -s tls://demo.nats.io:4443 foo hello
func usage() {
log.Fatalf("Usage: nats-rply [-s server][-t] <subject> \n")
}
func printMsg(m *nats.Msg, i int) {
log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, string(m.Data))
}
func main() {
log.Printf("Main goroutine ID:%d\n", getGID())
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var showTime = flag.Bool("t", false, "Display timestamps")
//log.SetFlags(0)
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) < 1 {
usage()
}
nc, err := nats.Connect(*urls)
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
subj, i := args[0], 0
nc.Subscribe(subj, func(msg *nats.Msg) {
i++
printMsg(msg, i)
//simulation of some processing time
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
})
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
}
log.Printf("Listening on [%s]\n", subj)
if *showTime {
log.SetFlags(log.LstdFlags)
}
runtime.Goexit()
}
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
nats-req.go 已修改为在并行启动的单独 10 个 goroutine 中发送 10 个请求,请求超时已设置为 3.5 秒。我尝试了具有共享 NATS 连接的 goroutines(函数 oneReq())以及具有自己的 NATS 连接的 goroutines(函数 onReqSeparateConnect())——同样不成功。
package main
import (
"flag"
"fmt"
"github.com/nats-io/go-nats"
"sync"
"time"
"log"
)
// NOTE: Use tls scheme for TLS, e.g. nats-req -s tls://demo.nats.io:4443 foo hello
func usage() {
log.Fatalf("Usage: nats-req [-s server (%s)] <subject> \n", nats.DefaultURL)
}
func main() {
//var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
//log.SetFlags(0)
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) < 1 {
usage()
}
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
defer nc.Close()
subj := args[0]
var wg sync.WaitGroup
wg.Add(10)
for i := 1; i <= 10; i++ {
//go oneReq(subj, fmt.Sprintf("Request%d", i), nc, &wg)
go oneReqSeparateConnect(subj, fmt.Sprintf("Request%d", i), &wg)
}
wg.Wait()
}
func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) {
defer wg.Done()
msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
if err != nil {
if nc.LastError() != nil {
log.Printf("Error in Request: %v\n", nc.LastError())
}
log.Printf("Error in Request: %v\n", err)
} else {
log.Printf("Published [%s] : '%s'\n", subj, payload)
log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data))
}
}
func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) {
defer wg.Done()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Printf("Can't connect: %v\n", err)
return
}
defer nc.Close()
msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
if err != nil {
if nc.LastError() != nil {
log.Printf("Error in Request: %v\n", nc.LastError())
}
log.Printf("Error in Request: %v\n", err)
} else {
log.Printf("Published [%s] : '%s'\n", subj, payload)
log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data))
}
}
结果是 - 不需要的行为,看起来 nats-rply.go 只创建了一个 goroutine 来处理传入的请求,并且请求是以串行方式处理的。
nats-req.go 一次发送所有 10 个请求,超时设置为 3.5 秒。 nats-rply.go 开始以串行方式以一秒的间隔响应请求,因此只有 3 个请求得到满足,直到超过 3.5 秒超时 - 其余请求超时。响应消息还包含 GoroutineID,它对所有传入请求都是相同的!即使再次启动 nats-req,goroutine id 也是相同的,只有当 nats-rply.go 服务器重新启动时,ID 才会改变。
nats-req.go日志
D:\PRAC\TSP\AMON>nats-req foo
2017/08/29 18:46:48 Sending: 'Request9'
2017/08/29 18:46:48 Sending: 'Request7'
2017/08/29 18:46:48 Sending: 'Request10'
2017/08/29 18:46:48 Sending: 'Request4'
2017/08/29 18:46:48 Sending: 'Request8'
2017/08/29 18:46:48 Sending: 'Request6'
2017/08/29 18:46:48 Sending: 'Request1'
2017/08/29 18:46:48 Sending: 'Request5'
2017/08/29 18:46:48 Sending: 'Request2'
2017/08/29 18:46:48 Sending: 'Request3'
2017/08/29 18:46:49 Published [foo] : 'Request9'
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : 'REPLY TO request "Request9", GoroutineId:36'
2017/08/29 18:46:50 Published [foo] : 'Request7'
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : 'REPLY TO request "Request7", GoroutineId:36'
2017/08/29 18:46:51 Published [foo] : 'Request10'
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : 'REPLY TO request "Request10", GoroutineId:36'
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
nats-rply.go日志
C:\Users\belunek>nats-rply foo
2017/08/29 18:46:46 Main goroutine ID:1
2017/08/29 18:46:46 Listening on [foo]
2017/08/29 18:46:48 [#1] Received on [foo]: 'Request9'
2017/08/29 18:46:49 [#2] Received on [foo]: 'Request7'
2017/08/29 18:46:50 [#3] Received on [foo]: 'Request10'
2017/08/29 18:46:51 [#4] Received on [foo]: 'Request4'
2017/08/29 18:46:52 [#5] Received on [foo]: 'Request8'
2017/08/29 18:46:53 [#6] Received on [foo]: 'Request6'
2017/08/29 18:46:54 [#7] Received on [foo]: 'Request1'
2017/08/29 18:46:55 [#8] Received on [foo]: 'Request5'
2017/08/29 18:46:56 [#9] Received on [foo]: 'Request2'
2017/08/29 18:46:57 [#10] Received on [foo]: 'Request3'
请问如何使用异步(并行)响应处理在 NATS 中正确实施 request/response 通信?
感谢您提供任何信息。
Gnatsd 以异步方式回复 Request
,但它不会为每个请求启动 goroutine,只是纯异步。并且因为你使用 time.Sleep
模拟处理负载,它暂停调用 goroutine,它看起来像同步处理。如果您修改您的示例以使用 goroutines,一切正常。
...
nc.Subscribe(subj, func(msg *nats.Msg) {
go handler(msg, i, nc)
})
...
func handler(msg *nats.Msg, i int, nc *nats.Conn) {
i++
printMsg(msg, i)
//simulation of some processing time
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
}
输出:
./nats-rply test
2017/08/30 00:17:05 Main goroutine ID:1
2017/08/30 00:17:05 Listening on [test]
2017/08/30 00:17:11 [#1] Received on [test]: 'Request6'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request5'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request1'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request8'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request3'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request7'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request9'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request4'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request2'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request10'
./nats-req test
2017/08/30 00:17:12 Published [test] : 'Request3'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Bq] : 'REPLY TO request "Request3", GoroutineId:37'
2017/08/30 00:17:12 Published [test] : 'Request7'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5z6] : 'REPLY TO request "Request7", GoroutineId:42'
2017/08/30 00:17:12 Published [test] : 'Request10'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5wY] : 'REPLY TO request "Request10", GoroutineId:43'
2017/08/30 00:17:12 Published [test] : 'Request5'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6EO] : 'REPLY TO request "Request5", GoroutineId:34'
2017/08/30 00:17:12 Published [test] : 'Request8'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm66k] : 'REPLY TO request "Request8", GoroutineId:36'
2017/08/30 00:17:12 Published [test] : 'Request1'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm64C] : 'REPLY TO request "Request1", GoroutineId:35'
2017/08/30 00:17:12 Published [test] : 'Request2'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Gw] : 'REPLY TO request "Request2", GoroutineId:41'
2017/08/30 00:17:12 Published [test] : 'Request4'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm69I] : 'REPLY TO request "Request4", GoroutineId:40'
2017/08/30 00:17:12 Published [test] : 'Request9'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm61e] : 'REPLY TO request "Request9", GoroutineId:39'
2017/08/30 00:17:12 Published [test] : 'Request6'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5u0] : 'REPLY TO request "Request6", GoroutineId:38'
请记住,通过从消息处理程序启动一个 go-routine,您的处理顺序超出 window。这就是 NATS 串行调用消息处理程序以向用户提供有保证的顺序的原因。如果顺序对您来说不重要,那么确实很容易在单独的 go-routine(或 go-routines 池)中开始处理消息。
我正在尝试使用 GO 语言在 gnatsd 中实现 request/response 功能,我意识到 gnatsd 不会以异步方式回复请求。
我开始使用 NATS github 示例 https://github.com/nats-io/go-nats/tree/master/examples - 示例 nats-req.go 和 nats-rply.go 进行调查。这些例子效果很好。
然后我简单地修改了它们以测试 gnatsd 上的并行请求,并提供一些调试信息,其中处理异步回复的 goroutine ID。 有修改示例源码
nats-rply.go 已被修改为简单地 return 传入请求的返回文本,其中包含有关当前 goroutine ID 的信息。我还在异步处理函数中添加了 1 秒睡眠来模拟一些处理时间。
package main
import (
"fmt"
"github.com/nats-io/go-nats"
"flag"
"log"
"runtime"
"time"
"bytes"
"strconv"
)
// NOTE: Use tls scheme for TLS, e.g. nats-rply -s tls://demo.nats.io:4443 foo hello
func usage() {
log.Fatalf("Usage: nats-rply [-s server][-t] <subject> \n")
}
func printMsg(m *nats.Msg, i int) {
log.Printf("[#%d] Received on [%s]: '%s'\n", i, m.Subject, string(m.Data))
}
func main() {
log.Printf("Main goroutine ID:%d\n", getGID())
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var showTime = flag.Bool("t", false, "Display timestamps")
//log.SetFlags(0)
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) < 1 {
usage()
}
nc, err := nats.Connect(*urls)
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
subj, i := args[0], 0
nc.Subscribe(subj, func(msg *nats.Msg) {
i++
printMsg(msg, i)
//simulation of some processing time
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
})
nc.Flush()
if err := nc.LastError(); err != nil {
log.Fatal(err)
}
log.Printf("Listening on [%s]\n", subj)
if *showTime {
log.SetFlags(log.LstdFlags)
}
runtime.Goexit()
}
func getGID() uint64 {
b := make([]byte, 64)
b = b[:runtime.Stack(b, false)]
b = bytes.TrimPrefix(b, []byte("goroutine "))
b = b[:bytes.IndexByte(b, ' ')]
n, _ := strconv.ParseUint(string(b), 10, 64)
return n
}
nats-req.go 已修改为在并行启动的单独 10 个 goroutine 中发送 10 个请求,请求超时已设置为 3.5 秒。我尝试了具有共享 NATS 连接的 goroutines(函数 oneReq())以及具有自己的 NATS 连接的 goroutines(函数 onReqSeparateConnect())——同样不成功。
package main
import (
"flag"
"fmt"
"github.com/nats-io/go-nats"
"sync"
"time"
"log"
)
// NOTE: Use tls scheme for TLS, e.g. nats-req -s tls://demo.nats.io:4443 foo hello
func usage() {
log.Fatalf("Usage: nats-req [-s server (%s)] <subject> \n", nats.DefaultURL)
}
func main() {
//var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
//log.SetFlags(0)
flag.Usage = usage
flag.Parse()
args := flag.Args()
if len(args) < 1 {
usage()
}
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("Can't connect: %v\n", err)
}
defer nc.Close()
subj := args[0]
var wg sync.WaitGroup
wg.Add(10)
for i := 1; i <= 10; i++ {
//go oneReq(subj, fmt.Sprintf("Request%d", i), nc, &wg)
go oneReqSeparateConnect(subj, fmt.Sprintf("Request%d", i), &wg)
}
wg.Wait()
}
func oneReq(subj string, payload string, nc *nats.Conn, wg *sync.WaitGroup) {
defer wg.Done()
msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
if err != nil {
if nc.LastError() != nil {
log.Printf("Error in Request: %v\n", nc.LastError())
}
log.Printf("Error in Request: %v\n", err)
} else {
log.Printf("Published [%s] : '%s'\n", subj, payload)
log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data))
}
}
func oneReqSeparateConnect(subj string, payload string, wg *sync.WaitGroup) {
defer wg.Done()
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Printf("Can't connect: %v\n", err)
return
}
defer nc.Close()
msg, err := nc.Request(subj, []byte(payload), 3500*time.Millisecond)
if err != nil {
if nc.LastError() != nil {
log.Printf("Error in Request: %v\n", nc.LastError())
}
log.Printf("Error in Request: %v\n", err)
} else {
log.Printf("Published [%s] : '%s'\n", subj, payload)
log.Printf("Received [%v] : '%s'\n", msg.Subject, string(msg.Data))
}
}
结果是 - 不需要的行为,看起来 nats-rply.go 只创建了一个 goroutine 来处理传入的请求,并且请求是以串行方式处理的。 nats-req.go 一次发送所有 10 个请求,超时设置为 3.5 秒。 nats-rply.go 开始以串行方式以一秒的间隔响应请求,因此只有 3 个请求得到满足,直到超过 3.5 秒超时 - 其余请求超时。响应消息还包含 GoroutineID,它对所有传入请求都是相同的!即使再次启动 nats-req,goroutine id 也是相同的,只有当 nats-rply.go 服务器重新启动时,ID 才会改变。
nats-req.go日志
D:\PRAC\TSP\AMON>nats-req foo
2017/08/29 18:46:48 Sending: 'Request9'
2017/08/29 18:46:48 Sending: 'Request7'
2017/08/29 18:46:48 Sending: 'Request10'
2017/08/29 18:46:48 Sending: 'Request4'
2017/08/29 18:46:48 Sending: 'Request8'
2017/08/29 18:46:48 Sending: 'Request6'
2017/08/29 18:46:48 Sending: 'Request1'
2017/08/29 18:46:48 Sending: 'Request5'
2017/08/29 18:46:48 Sending: 'Request2'
2017/08/29 18:46:48 Sending: 'Request3'
2017/08/29 18:46:49 Published [foo] : 'Request9'
2017/08/29 18:46:49 Received [_INBOX.xrsXYOB2QmW1f52pkfLHya.xrsXYOB2QmW1f52pkfLHzJ] : 'REPLY TO request "Request9", GoroutineId:36'
2017/08/29 18:46:50 Published [foo] : 'Request7'
2017/08/29 18:46:50 Received [_INBOX.xrsXYOB2QmW1f52pkfLI02.xrsXYOB2QmW1f52pkfLI0l] : 'REPLY TO request "Request7", GoroutineId:36'
2017/08/29 18:46:51 Published [foo] : 'Request10'
2017/08/29 18:46:51 Received [_INBOX.xrsXYOB2QmW1f52pkfLI1U.xrsXYOB2QmW1f52pkfLI2D] : 'REPLY TO request "Request10", GoroutineId:36'
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
2017/08/29 18:46:52 Error in Request: nats: timeout
nats-rply.go日志
C:\Users\belunek>nats-rply foo
2017/08/29 18:46:46 Main goroutine ID:1
2017/08/29 18:46:46 Listening on [foo]
2017/08/29 18:46:48 [#1] Received on [foo]: 'Request9'
2017/08/29 18:46:49 [#2] Received on [foo]: 'Request7'
2017/08/29 18:46:50 [#3] Received on [foo]: 'Request10'
2017/08/29 18:46:51 [#4] Received on [foo]: 'Request4'
2017/08/29 18:46:52 [#5] Received on [foo]: 'Request8'
2017/08/29 18:46:53 [#6] Received on [foo]: 'Request6'
2017/08/29 18:46:54 [#7] Received on [foo]: 'Request1'
2017/08/29 18:46:55 [#8] Received on [foo]: 'Request5'
2017/08/29 18:46:56 [#9] Received on [foo]: 'Request2'
2017/08/29 18:46:57 [#10] Received on [foo]: 'Request3'
请问如何使用异步(并行)响应处理在 NATS 中正确实施 request/response 通信? 感谢您提供任何信息。
Gnatsd 以异步方式回复 Request
,但它不会为每个请求启动 goroutine,只是纯异步。并且因为你使用 time.Sleep
模拟处理负载,它暂停调用 goroutine,它看起来像同步处理。如果您修改您的示例以使用 goroutines,一切正常。
...
nc.Subscribe(subj, func(msg *nats.Msg) {
go handler(msg, i, nc)
})
...
func handler(msg *nats.Msg, i int, nc *nats.Conn) {
i++
printMsg(msg, i)
//simulation of some processing time
time.Sleep(1 * time.Second)
newreply := []byte(fmt.Sprintf("REPLY TO request \"%s\", GoroutineId:%d", string(msg.Data), getGID()))
nc.Publish(msg.Reply, []byte(newreply))
}
输出:
./nats-rply test
2017/08/30 00:17:05 Main goroutine ID:1
2017/08/30 00:17:05 Listening on [test]
2017/08/30 00:17:11 [#1] Received on [test]: 'Request6'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request5'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request1'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request8'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request3'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request7'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request9'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request4'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request2'
2017/08/30 00:17:11 [#1] Received on [test]: 'Request10'
./nats-req test
2017/08/30 00:17:12 Published [test] : 'Request3'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Bq] : 'REPLY TO request "Request3", GoroutineId:37'
2017/08/30 00:17:12 Published [test] : 'Request7'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5z6] : 'REPLY TO request "Request7", GoroutineId:42'
2017/08/30 00:17:12 Published [test] : 'Request10'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5wY] : 'REPLY TO request "Request10", GoroutineId:43'
2017/08/30 00:17:12 Published [test] : 'Request5'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6EO] : 'REPLY TO request "Request5", GoroutineId:34'
2017/08/30 00:17:12 Published [test] : 'Request8'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm66k] : 'REPLY TO request "Request8", GoroutineId:36'
2017/08/30 00:17:12 Published [test] : 'Request1'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm64C] : 'REPLY TO request "Request1", GoroutineId:35'
2017/08/30 00:17:12 Published [test] : 'Request2'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm6Gw] : 'REPLY TO request "Request2", GoroutineId:41'
2017/08/30 00:17:12 Published [test] : 'Request4'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm69I] : 'REPLY TO request "Request4", GoroutineId:40'
2017/08/30 00:17:12 Published [test] : 'Request9'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm61e] : 'REPLY TO request "Request9", GoroutineId:39'
2017/08/30 00:17:12 Published [test] : 'Request6'
2017/08/30 00:17:12 Received [_INBOX.xoG573m0V7dVoIJxojm5u0] : 'REPLY TO request "Request6", GoroutineId:38'
请记住,通过从消息处理程序启动一个 go-routine,您的处理顺序超出 window。这就是 NATS 串行调用消息处理程序以向用户提供有保证的顺序的原因。如果顺序对您来说不重要,那么确实很容易在单独的 go-routine(或 go-routines 池)中开始处理消息。