Go:在 For 循环中取消上下文

Go : Cancel Context inside a For loop

我正在尝试在 Golang 中创建一个 UDP 服务器以在端口上监听,例如。 1234. 我有一个客户端向此服务器发送 start/stop 消息。

在收到消息“start”时,服务器将开始向该客户端发送随机数据,在停止时,服务器将停止向客户端发送。

为此,我使用上下文创建一个 goroutine 来发送数据并在它“停止”时取消它。

我得到的错误是该程序在一个客户端上工作正常,但如果我再次启动该客户端,则不会再次发送数据。

任何帮助将不胜感激?

UDP 服务器代码:

package main

import (
    "context"
    "fmt"
    "math/rand"
    "net"
    "time"
)

func generateMessageToUDP(ctx context.Context, addr *net.UDPAddr) {
    // stop writing to UDP
    done := false
    fmt.Println("Generating message to UDP client", addr)
    conn, err := net.DialUDP("udp", nil, addr)
    if err != nil {
        fmt.Println("Error: ", err)
    }
    defer func(conn *net.UDPConn) {
        err := conn.Close()
        if err != nil {
            fmt.Println("Error in closing the UDP Connection: ", err)
        }
    }(conn)
    // write to address using UDP connection
    go func() {
        for i := 0; !done; i++ {
            RandomInt := rand.Intn(100)
            fmt.Println("Random Int: ", RandomInt)
            _, err = conn.Write([]byte(fmt.Sprintf("%d", RandomInt)))
            fmt.Println("Sent ", RandomInt, " to ", addr)
            time.Sleep(time.Second * 1)
        }
    }()
    <-ctx.Done()
    fmt.Println("Stopping writing to UDP client", addr)
    done = true
}

//var addr *net.UDPAddr
//var conn *net.UDPConn

func main() {
    fmt.Println("Hi this is a UDP server")
    udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
    if err != nil {
        fmt.Println("Error: ", err)
    }
    defer func(udpServer *net.UDPConn) {
        err := udpServer.Close()
        if err != nil {
            fmt.Println("Error in closing the UDP Connection: ", err)
        }
    }(udpServer)
    // create a buffer to read data into
    buffer := make([]byte, 1024)
    ctx, cancel := context.WithCancel(context.Background())
    for {
        // read the incoming connection into the buffer
        n, addr, err := udpServer.ReadFromUDP(buffer)
        fmt.Println("Recieved ", string(buffer[0:n]), " from ", addr)
        if err != nil {
            fmt.Println("Error: ", err)
        }
        fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
        if string(buffer[0:n]) == "stop" {
            fmt.Println("Stopped listening")
            cancel()
            continue
        } else if string(buffer[0:n]) == "start" {
            // send a response back to the client
            _, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
            if err != nil {
                fmt.Println("Error: ", err)
            }
            // start a routine to generate messages to the client
            generateMessageToUDP(ctx, addr)
        } else {
            fmt.Println("Unknown command")
        }
    }
}

客户代码:

package main

import (
    "fmt"
    "net"
    "time"
)

func main() {
    fmt.Println("Hello, I am a client")

    // Create a new client
    localAddr, err := net.ResolveUDPAddr("udp", ":5011")
    client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
    if err != nil {
        fmt.Println(err)
        return
    }
    defer client3.Close()
    _, err = client3.Write([]byte("start"))
    if err != nil {
        fmt.Println(err)
        return
    }
    fmt.Println("Message sent. Sleeping for 5 seconds")
    time.Sleep(time.Second * 5)
    fmt.Println("Sending stop message")
    _, err = client3.Write([]byte("stop"))
    if err != nil {
        fmt.Println(err)
    }
}

好的,我在服务器上做了一个简单的 hack,并在创建上下文之前添加了一个标签 Start,当我取消上下文时,我添加了 goto 标签。这意味着当任务被取消时,它将再次创建上下文并开始执行它的工作

你必须注意你所做的事情。

  • 避免数据竞争(完成变量是 read/write 由两个没有同步机制的不同例程)https://go.dev/doc/articles/race_detector

  • 不要在每次程序开始向新客户端发送消息时都创建新的拨号程序。这将打开一个新的本地地址并使用它发送给客户端。客户端将从另一个地址接收消息,它通常应该忽略这些消息,因为它没有启动与该远程地址的任何交换。

  • 不要混淆客户端生命周期和程序上下文生命周期。在提供的代码中,客户端发送停止消息将触发整个程序的取消功能,它将停止所有客户端。为每个客户端创建一个新的上下文,从程序上下文派生,在收到停止消息时取消相关的客户端上下文。

  • UDP 连接由所有客户端共享,不能因为程序正在为客户端服务而停止侦听传入数据包。 IE 对 generateMessageToUDP 的调用应该在另一个例程中执行。

以下是考虑到这些评论的修订版。

添加 var peers map[string]peer 以将远程地址与上下文匹配。类型 peer 定义为 struct {stop func();since time.Time}。收到开始消息后,peer 被添加到具有派生上下文 pctx, pcancel := context.WithCancel(ctx)map。然后在不同的例程 go generateMessageToUDP(pctx, udpServer, addr) 中为新客户端提供服务,该例程绑定到新创建的上下文和服务器套接字。收到停止消息后,程序执行查找 peer, ok := peers[addr.String()],然后取消关联的对等上下文 peer.stop(); delete(peers, addr.String()) 并忘记对等。

package main

import (
    "context"
    "fmt"
    "math/rand"
    "net"
    "time"
)

func generateMessageToUDP(ctx context.Context, conn *net.UDPConn, addr *net.UDPAddr) {
    fmt.Println("Generating message to UDP client", addr)
    go func() {
        for i := 0; ; i++ {
            RandomInt := rand.Intn(100)
            d := []byte(fmt.Sprintf("%d", RandomInt))
            conn.WriteTo(d, addr)
            time.Sleep(time.Second * 1)
        }
    }()
    <-ctx.Done()
    fmt.Println("Stopping writing to UDP client", addr)
}

//var addr *net.UDPAddr
//var conn *net.UDPConn

func main() {
    fmt.Println("Hi this is a UDP server")
    udpServer, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 5010})
    if err != nil {
        fmt.Println("Error: ", err)
    }
    defer func(udpServer *net.UDPConn) {
        err := udpServer.Close()
        if err != nil {
            fmt.Println("Error in closing the UDP Connection: ", err)
        }
    }(udpServer)
    // create a buffer to read data into
    type peer struct {
        stop  func()
        since time.Time
    }
    peers := map[string]peer{}
    buffer := make([]byte, 1024)
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    for {
        // read the incoming connection into the buffer
        n, addr, err := udpServer.ReadFromUDP(buffer)
        if err != nil {
            fmt.Println("Error: ", err)
        }
        fmt.Println("Received ", string(buffer[0:n]), " from ", addr)
        if string(buffer[0:n]) == "stop" {
            fmt.Println("Stopped listening")
            peer, ok := peers[addr.String()]
            if !ok {
                continue
            }
            peer.stop()
            delete(peers, addr.String())
            continue
        } else if string(buffer[0:n]) == "start" {
            peer, ok := peers[addr.String()]
            if ok {
                continue
            }
            pctx, pcancel := context.WithCancel(ctx)
            peer.stop = pcancel
            peer.since = time.Now()
            peers[addr.String()] = peer
            // send a response back to the client
            _, err = udpServer.WriteToUDP([]byte("Hi, I am a UDP server"), addr)
            if err != nil {
                fmt.Println("Error: ", err)
            }
            // start a routine to generate messages to the client
            go generateMessageToUDP(pctx, udpServer, addr)
        } else if string(buffer[0:n]) == "ping" {
            peer, ok := peers[addr.String()]
            if !ok {
                continue
            }
            peer.since = time.Now()
            peers[addr.String()] = peer
        } else {
            fmt.Println("Unknown command")
        }
        for addr, p := range peers {
            if time.Since(p.since) > time.Minute {
                fmt.Println("Peer timedout")
                p.stop()
                delete(peers, addr)
            }
        }
    }
}
-- go.mod --
module play.ground
-- client.go --
package main

import (
    "fmt"
    "log"
    "net"
    "time"
)

func main() {
    fmt.Println("Hello, I am a client")

    // Create a new client
    localAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:5011")
    client3, err := net.DialUDP("udp", localAddr, &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 5010})
    if err != nil {
        fmt.Println(err)
        return
    }
    defer client3.Close()
    var n int
    n, err = client3.Write([]byte("start"))
    if err != nil {
        fmt.Println(err)
        return
    }
    log.Println(n)
    now := time.Now()
    b := make([]byte, 2048)
    for time.Since(now) < time.Second*10 {
        n, addr, err := client3.ReadFrom(b)
        fmt.Println(n, addr, err)
        if err != nil {
            fmt.Println(err)
            continue
        }
        if addr.String() == "127.0.0.1:5010" {
            m := b[:n]
            fmt.Println("message:", string(m))
        }
    }
    fmt.Println("Sending stop message")
    _, err = client3.Write([]byte("stop"))
    if err != nil {
        fmt.Println(err)
    }
}

    go func() {
        for i := 0; ; i++ {
            RandomInt := rand.Intn(100)
            d := []byte(fmt.Sprintf("%d", RandomInt))
            conn.WriteTo(d, addr)
            time.Sleep(time.Second * 1)
        }
    }()

我把 reader 遗漏的 select 作为练习留给了上下文通道,以确定例程是否应该退出。