Go:在 For 循环中取消上下文
Go : Cancel Context inside a For loop
我正在尝试在 Golang 中创建一个 UDP 服务器以在端口上监听,例如。 1234. 我有一个客户端向此服务器发送 start/stop 消息。
在收到消息“start”时,服务器将开始向该客户端发送随机数据,在停止时,服务器将停止向客户端发送。
为此,我使用上下文创建一个 goroutine 来发送数据并在它“停止”时取消它。
我得到的错误是该程序在一个客户端上工作正常,但如果我再次启动该客户端,则不会再次发送数据。
任何帮助将不胜感激?
UDP 服务器代码:
package main
import (
"context"
"fmt"
"math/rand"
"net"
"time"
)
func generateMessageToUDP(ctx context.Context, addr *net.UDPAddr) {
// stop writing to UDP
done := false
fmt.Println("Generating message to UDP client", addr)
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
fmt.Println("Error: ", err)
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
fmt.Println("Error in closing the UDP Connection: ", err)
}
}(conn)
// write to address using UDP connection
go func() {
for i := 0; !done; i++ {
RandomInt := rand.Intn(100)
fmt.Println("Random Int: ", RandomInt)
_, err = conn.Write([]byte(fmt.Sprintf("%d", RandomInt)))
fmt.Println("Sent ", RandomInt, " to ", addr)
time.Sleep(time.Second * 1)
}
}()
<-ctx.Done()
fmt.Println("Stopping writing to UDP client", addr)
done = true
}
//var addr *net.UDPAddr
//var conn *net.UDPConn
func main() {
fmt.Println("Hi this is a UDP server")
udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
if err != nil {
fmt.Println("Error: ", err)
}
defer func(udpServer *net.UDPConn) {
err := udpServer.Close()
if err != nil {
fmt.Println("Error in closing the UDP Connection: ", err)
}
}(udpServer)
// create a buffer to read data into
buffer := make([]byte, 1024)
ctx, cancel := context.WithCancel(context.Background())
for {
// read the incoming connection into the buffer
n, addr, err := udpServer.ReadFromUDP(buffer)
fmt.Println("Recieved ", string(buffer[0:n]), " from ", addr)
if err != nil {
fmt.Println("Error: ", err)
}
fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
if string(buffer[0:n]) == "stop" {
fmt.Println("Stopped listening")
cancel()
continue
} else if string(buffer[0:n]) == "start" {
// send a response back to the client
_, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
if err != nil {
fmt.Println("Error: ", err)
}
// start a routine to generate messages to the client
generateMessageToUDP(ctx, addr)
} else {
fmt.Println("Unknown command")
}
}
}
客户代码:
package main
import (
"fmt"
"net"
"time"
)
func main() {
fmt.Println("Hello, I am a client")
// Create a new client
localAddr, err := net.ResolveUDPAddr("udp", ":5011")
client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
if err != nil {
fmt.Println(err)
return
}
defer client3.Close()
_, err = client3.Write([]byte("start"))
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Message sent. Sleeping for 5 seconds")
time.Sleep(time.Second * 5)
fmt.Println("Sending stop message")
_, err = client3.Write([]byte("stop"))
if err != nil {
fmt.Println(err)
}
}
好的,我在服务器上做了一个简单的 hack,并在创建上下文之前添加了一个标签 Start,当我取消上下文时,我添加了 goto 标签。这意味着当任务被取消时,它将再次创建上下文并开始执行它的工作
你必须注意你所做的事情。
避免数据竞争(完成变量是 read/write 由两个没有同步机制的不同例程)https://go.dev/doc/articles/race_detector
不要在每次程序开始向新客户端发送消息时都创建新的拨号程序。这将打开一个新的本地地址并使用它发送给客户端。客户端将从另一个地址接收消息,它通常应该忽略这些消息,因为它没有启动与该远程地址的任何交换。
不要混淆客户端生命周期和程序上下文生命周期。在提供的代码中,客户端发送停止消息将触发整个程序的取消功能,它将停止所有客户端。为每个客户端创建一个新的上下文,从程序上下文派生,在收到停止消息时取消相关的客户端上下文。
UDP 连接由所有客户端共享,不能因为程序正在为客户端服务而停止侦听传入数据包。 IE 对 generateMessageToUDP
的调用应该在另一个例程中执行。
以下是考虑到这些评论的修订版。
添加 var peers map[string]peer
以将远程地址与上下文匹配。类型 peer
定义为 struct {stop func();since time.Time}
。收到开始消息后,peer
被添加到具有派生上下文 pctx, pcancel := context.WithCancel(ctx)
的 map
。然后在不同的例程 go generateMessageToUDP(pctx, udpServer, addr)
中为新客户端提供服务,该例程绑定到新创建的上下文和服务器套接字。收到停止消息后,程序执行查找 peer, ok := peers[addr.String()]
,然后取消关联的对等上下文 peer.stop(); delete(peers, addr.String())
并忘记对等。
package main
import (
"context"
"fmt"
"math/rand"
"net"
"time"
)
func generateMessageToUDP(ctx context.Context, conn *net.UDPConn, addr *net.UDPAddr) {
fmt.Println("Generating message to UDP client", addr)
go func() {
for i := 0; ; i++ {
RandomInt := rand.Intn(100)
d := []byte(fmt.Sprintf("%d", RandomInt))
conn.WriteTo(d, addr)
time.Sleep(time.Second * 1)
}
}()
<-ctx.Done()
fmt.Println("Stopping writing to UDP client", addr)
}
//var addr *net.UDPAddr
//var conn *net.UDPConn
func main() {
fmt.Println("Hi this is a UDP server")
udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
if err != nil {
fmt.Println("Error: ", err)
}
defer func(udpServer *net.UDPConn) {
err := udpServer.Close()
if err != nil {
fmt.Println("Error in closing the UDP Connection: ", err)
}
}(udpServer)
// create a buffer to read data into
type peer struct {
stop func()
since time.Time
}
peers := map[string]peer{}
buffer := make([]byte, 1024)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
// read the incoming connection into the buffer
n, addr, err := udpServer.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error: ", err)
}
fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
if string(buffer[0:n]) == "stop" {
fmt.Println("Stopped listening")
peer, ok := peers[addr.String()]
if !ok {
continue
}
peer.stop()
delete(peers, addr.String())
continue
} else if string(buffer[0:n]) == "start" {
peer, ok := peers[addr.String()]
if ok {
continue
}
pctx, pcancel := context.WithCancel(ctx)
peer.stop = pcancel
peer.since = time.Now()
peers[addr.String()] = peer
// send a response back to the client
_, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
if err != nil {
fmt.Println("Error: ", err)
}
// start a routine to generate messages to the client
go generateMessageToUDP(pctx, udpServer, addr)
} else if string(buffer[0:n]) == "ping" {
peer, ok := peers[addr.String()]
if !ok {
continue
}
peer.since = time.Now()
peers[addr.String()] = peer
} else {
fmt.Println("Unknown command")
}
for addr, p := range peers {
if time.Since(p.since) > time.Minute {
fmt.Println("Peer timedout")
p.stop()
delete(peers, addr)
}
}
}
}
-- go.mod --
module play.ground
-- client.go --
package main
import (
"fmt"
"log"
"net"
"time"
)
func main() {
fmt.Println("Hello, I am a client")
// Create a new client
localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5011")
client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
if err != nil {
fmt.Println(err)
return
}
defer client3.Close()
var n int
n, err = client3.Write([]byte("start"))
if err != nil {
fmt.Println(err)
return
}
log.Println(n)
now := time.Now()
b := make([]byte, 2048)
for time.Since(now) < time.Second*10 {
n, addr, err := client3.ReadFrom(b)
fmt.Println(n, addr, err)
if err != nil {
fmt.Println(err)
continue
}
if addr.String() == "127.0.0.1:5010" {
m := b[:n]
fmt.Println("message:", string(m))
}
}
fmt.Println("Sending stop message")
_, err = client3.Write([]byte("stop"))
if err != nil {
fmt.Println(err)
}
}
在
go func() {
for i := 0; ; i++ {
RandomInt := rand.Intn(100)
d := []byte(fmt.Sprintf("%d", RandomInt))
conn.WriteTo(d, addr)
time.Sleep(time.Second * 1)
}
}()
我把 reader 遗漏的 select 作为练习留给了上下文通道,以确定例程是否应该退出。
我正在尝试在 Golang 中创建一个 UDP 服务器以在端口上监听,例如。 1234. 我有一个客户端向此服务器发送 start/stop 消息。
在收到消息“start”时,服务器将开始向该客户端发送随机数据,在停止时,服务器将停止向客户端发送。
为此,我使用上下文创建一个 goroutine 来发送数据并在它“停止”时取消它。
我得到的错误是该程序在一个客户端上工作正常,但如果我再次启动该客户端,则不会再次发送数据。
任何帮助将不胜感激?
UDP 服务器代码:
package main
import (
"context"
"fmt"
"math/rand"
"net"
"time"
)
func generateMessageToUDP(ctx context.Context, addr *net.UDPAddr) {
// stop writing to UDP
done := false
fmt.Println("Generating message to UDP client", addr)
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
fmt.Println("Error: ", err)
}
defer func(conn *net.UDPConn) {
err := conn.Close()
if err != nil {
fmt.Println("Error in closing the UDP Connection: ", err)
}
}(conn)
// write to address using UDP connection
go func() {
for i := 0; !done; i++ {
RandomInt := rand.Intn(100)
fmt.Println("Random Int: ", RandomInt)
_, err = conn.Write([]byte(fmt.Sprintf("%d", RandomInt)))
fmt.Println("Sent ", RandomInt, " to ", addr)
time.Sleep(time.Second * 1)
}
}()
<-ctx.Done()
fmt.Println("Stopping writing to UDP client", addr)
done = true
}
//var addr *net.UDPAddr
//var conn *net.UDPConn
func main() {
fmt.Println("Hi this is a UDP server")
udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
if err != nil {
fmt.Println("Error: ", err)
}
defer func(udpServer *net.UDPConn) {
err := udpServer.Close()
if err != nil {
fmt.Println("Error in closing the UDP Connection: ", err)
}
}(udpServer)
// create a buffer to read data into
buffer := make([]byte, 1024)
ctx, cancel := context.WithCancel(context.Background())
for {
// read the incoming connection into the buffer
n, addr, err := udpServer.ReadFromUDP(buffer)
fmt.Println("Recieved ", string(buffer[0:n]), " from ", addr)
if err != nil {
fmt.Println("Error: ", err)
}
fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
if string(buffer[0:n]) == "stop" {
fmt.Println("Stopped listening")
cancel()
continue
} else if string(buffer[0:n]) == "start" {
// send a response back to the client
_, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
if err != nil {
fmt.Println("Error: ", err)
}
// start a routine to generate messages to the client
generateMessageToUDP(ctx, addr)
} else {
fmt.Println("Unknown command")
}
}
}
客户代码:
package main
import (
"fmt"
"net"
"time"
)
func main() {
fmt.Println("Hello, I am a client")
// Create a new client
localAddr, err := net.ResolveUDPAddr("udp", ":5011")
client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
if err != nil {
fmt.Println(err)
return
}
defer client3.Close()
_, err = client3.Write([]byte("start"))
if err != nil {
fmt.Println(err)
return
}
fmt.Println("Message sent. Sleeping for 5 seconds")
time.Sleep(time.Second * 5)
fmt.Println("Sending stop message")
_, err = client3.Write([]byte("stop"))
if err != nil {
fmt.Println(err)
}
}
好的,我在服务器上做了一个简单的 hack,并在创建上下文之前添加了一个标签 Start,当我取消上下文时,我添加了 goto 标签。这意味着当任务被取消时,它将再次创建上下文并开始执行它的工作
你必须注意你所做的事情。
避免数据竞争(完成变量是 read/write 由两个没有同步机制的不同例程)https://go.dev/doc/articles/race_detector
不要在每次程序开始向新客户端发送消息时都创建新的拨号程序。这将打开一个新的本地地址并使用它发送给客户端。客户端将从另一个地址接收消息,它通常应该忽略这些消息,因为它没有启动与该远程地址的任何交换。
不要混淆客户端生命周期和程序上下文生命周期。在提供的代码中,客户端发送停止消息将触发整个程序的取消功能,它将停止所有客户端。为每个客户端创建一个新的上下文,从程序上下文派生,在收到停止消息时取消相关的客户端上下文。
UDP 连接由所有客户端共享,不能因为程序正在为客户端服务而停止侦听传入数据包。 IE 对
generateMessageToUDP
的调用应该在另一个例程中执行。
以下是考虑到这些评论的修订版。
添加 var peers map[string]peer
以将远程地址与上下文匹配。类型 peer
定义为 struct {stop func();since time.Time}
。收到开始消息后,peer
被添加到具有派生上下文 pctx, pcancel := context.WithCancel(ctx)
的 map
。然后在不同的例程 go generateMessageToUDP(pctx, udpServer, addr)
中为新客户端提供服务,该例程绑定到新创建的上下文和服务器套接字。收到停止消息后,程序执行查找 peer, ok := peers[addr.String()]
,然后取消关联的对等上下文 peer.stop(); delete(peers, addr.String())
并忘记对等。
package main
import (
"context"
"fmt"
"math/rand"
"net"
"time"
)
func generateMessageToUDP(ctx context.Context, conn *net.UDPConn, addr *net.UDPAddr) {
fmt.Println("Generating message to UDP client", addr)
go func() {
for i := 0; ; i++ {
RandomInt := rand.Intn(100)
d := []byte(fmt.Sprintf("%d", RandomInt))
conn.WriteTo(d, addr)
time.Sleep(time.Second * 1)
}
}()
<-ctx.Done()
fmt.Println("Stopping writing to UDP client", addr)
}
//var addr *net.UDPAddr
//var conn *net.UDPConn
func main() {
fmt.Println("Hi this is a UDP server")
udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
if err != nil {
fmt.Println("Error: ", err)
}
defer func(udpServer *net.UDPConn) {
err := udpServer.Close()
if err != nil {
fmt.Println("Error in closing the UDP Connection: ", err)
}
}(udpServer)
// create a buffer to read data into
type peer struct {
stop func()
since time.Time
}
peers := map[string]peer{}
buffer := make([]byte, 1024)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
// read the incoming connection into the buffer
n, addr, err := udpServer.ReadFromUDP(buffer)
if err != nil {
fmt.Println("Error: ", err)
}
fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
if string(buffer[0:n]) == "stop" {
fmt.Println("Stopped listening")
peer, ok := peers[addr.String()]
if !ok {
continue
}
peer.stop()
delete(peers, addr.String())
continue
} else if string(buffer[0:n]) == "start" {
peer, ok := peers[addr.String()]
if ok {
continue
}
pctx, pcancel := context.WithCancel(ctx)
peer.stop = pcancel
peer.since = time.Now()
peers[addr.String()] = peer
// send a response back to the client
_, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
if err != nil {
fmt.Println("Error: ", err)
}
// start a routine to generate messages to the client
go generateMessageToUDP(pctx, udpServer, addr)
} else if string(buffer[0:n]) == "ping" {
peer, ok := peers[addr.String()]
if !ok {
continue
}
peer.since = time.Now()
peers[addr.String()] = peer
} else {
fmt.Println("Unknown command")
}
for addr, p := range peers {
if time.Since(p.since) > time.Minute {
fmt.Println("Peer timedout")
p.stop()
delete(peers, addr)
}
}
}
}
-- go.mod --
module play.ground
-- client.go --
package main
import (
"fmt"
"log"
"net"
"time"
)
func main() {
fmt.Println("Hello, I am a client")
// Create a new client
localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5011")
client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
if err != nil {
fmt.Println(err)
return
}
defer client3.Close()
var n int
n, err = client3.Write([]byte("start"))
if err != nil {
fmt.Println(err)
return
}
log.Println(n)
now := time.Now()
b := make([]byte, 2048)
for time.Since(now) < time.Second*10 {
n, addr, err := client3.ReadFrom(b)
fmt.Println(n, addr, err)
if err != nil {
fmt.Println(err)
continue
}
if addr.String() == "127.0.0.1:5010" {
m := b[:n]
fmt.Println("message:", string(m))
}
}
fmt.Println("Sending stop message")
_, err = client3.Write([]byte("stop"))
if err != nil {
fmt.Println(err)
}
}
在
go func() {
for i := 0; ; i++ {
RandomInt := rand.Intn(100)
d := []byte(fmt.Sprintf("%d", RandomInt))
conn.WriteTo(d, addr)
time.Sleep(time.Second * 1)
}
}()
我把 reader 遗漏的 select 作为练习留给了上下文通道,以确定例程是否应该退出。