当 JSON 作为响应发送但使用纯字符串时,并发 TCP 服务器挂起

Go concurrent TCP server hangs when JSON is sent as the response but works with plain string

我正在尝试在 Go 中实现一个并发 TCP 服务器,并在 linode 中找到了这个很好的解释 article,它用示例代码清楚地解释了。客户端和服务器的示例代码片段包含在下面。

并发 TCP 服务器,其中为每个 TCP 客户端创建一个新的 go-routine。

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strconv"
        "strings"
)

var count = 0

func handleConnection(c net.Conn) {
        fmt.Print(".")
        for {
                netData, err := bufio.NewReader(c).ReadString('\n')
                if err != nil {
                        fmt.Println(err)
                        return
                }

                temp := strings.TrimSpace(string(netData))
                if temp == "STOP" {
                        break
                }
                fmt.Println(temp)
                counter := strconv.Itoa(count) + "\n"
                c.Write([]byte(string(counter)))
        }
        c.Close()
}

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide a port number!")
                return
        }

        PORT := ":" + arguments[1]
        l, err := net.Listen("tcp4", PORT)
        if err != nil {
                fmt.Println(err)
                return
        }
        defer l.Close()

        for {
                c, err := l.Accept()
                if err != nil {
                        fmt.Println(err)
                        return
                }
                go handleConnection(c)
                count++
        }
}

TCP 客户端代码片段

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "strings"
)

func main() {
        arguments := os.Args
        if len(arguments) == 1 {
                fmt.Println("Please provide host:port.")
                return
        }

        CONNECT := arguments[1]
        c, err := net.Dial("tcp", CONNECT)
        if err != nil {
                fmt.Println(err)
                return
        }

        for {
                reader := bufio.NewReader(os.Stdin)
                fmt.Print(">> ")
                text, _ := reader.ReadString('\n')
                fmt.Fprintf(c, text+"\n")

                message, _ := bufio.NewReader(c).ReadString('\n')
                fmt.Print("->: " + message)
                if strings.TrimSpace(string(text)) == "STOP" {
                        fmt.Println("TCP client exiting...")
                        return
                }
        }
}

上述并发 TCP 服务器和客户端工作没有任何问题。当我更改 TCP 服务器以发送 JSON 响应而不是文本响应时,问题就出现了。当我更改行时:

counter := strconv.Itoa(count) + "\n"
c.Write([]byte(string(counter)))

res, err := json.Marshal(IdentitySuccess{MessageType: "newidentity", Approved: "approved"})
if err != nil {
    fmt.Printf("Error: %v", err)
}
c.Write(res)

服务器挂起,没有向客户端发送任何响应。奇怪的是,当我用Ctrl+C强制关闭服务器时,服务器将响应发送给客户端。对这种奇怪的行为有什么想法吗?就像服务器持有响应并在存在时发送它。

那个套接字教程,就像许多其他设计错误的套接字教程一样,根本没有解释什么是应用程序协议或为什么需要它。它只说:

In this example, you have implemented an unofficial protocol that is based on TCP.

这个“非官方”协议是最基本的:消息由换行符分隔 (\n)。

除了学习有关套接字的基础知识外,您不应该在任何环境中使用这样的套接字。

您需要一个应用程序协议来构建消息(以便您的客户端和服务器可以识别部分消息和串联消息)。

所以简短的回答:在 JSON 之后发送一个 \n。长答案:不要像这样使用准系统套接字,使用应用程序协议,例如 HTTP。

注意数据竞争。您正在从没有同步机制的不同例程中写入和读取 counter 变量。没有良性的数据竞争。

您的实施还不会成功,因为您没有测试并发客户端查询。

通过使用 -race 标志构建程序来启用竞争检测器,例如 go run -race . / go build -race .

我已经使用 atomic package 函数修复了数据争用问题。

在下面的代码中,我已将您的代码调整为使用 bufio.Scanner instead of bufio.Reader,仅用于演示目的。


    input := bufio.NewScanner(src)
    output := bufio.NewScanner(c)
    for input.Scan() {
        text := input.Text()
        fmt.Fprintf(c, "%v\n", text)
        isEOT := text == "STOP"

        if !output.Scan() {
            fmt.Fprintln(os.Stderr, output.Err())
            return
        }
        message := output.Text()
        fmt.Print("->: " + message)
        if isEOT {
            fmt.Println("All messages sent...")
            return
        }
    }

我还调整了 main 序列以模拟 2 个连续的客户端,使用我一路重置的预定义缓冲区输入。


    input := `hello
world!
STOP
nopnop`
    test := strings.NewReader(input)

    go serve(arguments[1])
  
    test.Reset(input)
    query(arguments[1], test)

    test.Reset(input)
    query(arguments[1], test)

我在你的客户端中添加了一个简单的重试器,它可以帮助我们编写简单的代码。

    c, err := net.Dial("tcp", addr)
    for {
        if err != nil {
            fmt.Fprintln(os.Stderr, err)
            <-time.After(time.Second)
            c, err = net.Dial("tcp", addr)
            continue
        }
        break
    }

整个程序被汇集到一个文件中,不太好读取输出,但更容易传输和执行。

https://play.golang.org/p/keKQsKA3fAw

在下面的示例中,我演示了如何使用 json 编组器/解组器来交换结构化数据。

    input := bufio.NewScanner(src)
    dst := json.NewEncoder(c)
    output := json.NewDecoder(c)
    for input.Scan() {
        text := input.Text()
        isEOT := text == "STOP"

        err = dst.Encode(text)
        if err != nil {
            fmt.Fprintln(os.Stderr, err)
            return
        }

        var tmp interface{}
        err = output.Decode(&tmp)
        if err != nil {
            fmt.Fprintln(os.Stderr, err)
            return
        }
        fmt.Printf("->: %v\n", tmp)
        if isEOT {
            fmt.Println("All messages sent...")
            return
        }
    }

但是!当心,最后一个版本对恶意用户是敏感的。与 bufio.Scannerbufio.Reader 不同,它不检查在线路上读取的数据量。所以它可能会累积数据直到 OOM.

对于服务器端来说尤其如此,在

defer c.Close()
        defer atomic.AddUint64(&count, ^uint64(0))
        input := json.NewDecoder(c)
        output := json.NewEncoder(c)
        fmt.Print(".")
        for {
            var netData interface{}
            input.Decode(&netData)
            fmt.Printf("%v", netData)
            count := atomic.LoadUint64(&count)
            output.Encode(count)
            if x, ok := netData.(string); ok && x == "STOP" {
                break
            }
        }

https://play.golang.org/p/LpIu4ofpm9e

正如 CodeCaster 所回答的,在您的最后一段代码中,不要忘记使用适当的分隔符来构建您的消息。