使用 net.Conn 连接正确处理读写

Proper handling of reading and writing with a net.Conn connection

我正在尝试使用网络连接进行读写。似乎正在发生的事情是,如果数据到达服务器的速度太快,一些数据就会被丢弃或丢失。客户端连接,协商连接,然后我让它快速连续发送 3 个命令来更新状态页面。每个通信都是一个 JSON 字符串,它被转换为结构并使用存储的密钥解码。

如果我多次点击客户端上的请求(每次都生成 3 个以 \n 结尾的 JSON 负载),服务器有时会抛出错误:顶级值后的无效字符 X。我转储了客户端发送的信息,看起来客户端有 3 个格式正确的 JSON 条目;在服务器端,JSON 有效负载之一似乎完全丢失,其中一个丢失前 515 个字符。由于缺少前 515 个字符,JSON 格式错误,因此编组失败。

我的问题是,如何防止从连接读取的数据丢失?我是遇到了某种竞争条件还是对如何读取和发送连接处理不当?

下面基本上是我用于客户端连接处理程序的内容。客户端和服务器协商加密连接,因此有多个对模式和 RSA 状态的引用,当密钥设置正确时使用模式 4,以便服务器和客户端可以交换命令和结果。在高层次上,处理程序分离出一个 goroutine,该 goroutine 从连接中读取并将其发送到通道。该字符串被读取并转换为结构。会话的第一部分专门用于“握手”以协商加密密钥并保存会话信息;一旦到达第 4 阶段,该结构将携带来自客户端的加密命令并将结果发回,直到连接错误或关闭。

func HandleClientConnection(conClient net.Conn) {
    defer conClient.Close()

    chnLogging <- "Connection from " + conClient.RemoteAddr().String()

    tmTimeout := time.NewTimer(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
    chnCloseConn := make(chan bool)

    chnDataFromClient := make(chan string, 1000)
    go func(chnData chan string) {

        for {
            netData, err := bufio.NewReader(conClient).ReadString('\n')
            if err != nil {
                if !strings.Contains(err.Error(), "EOF") {
                    chnLogging <- "Error from client " + conClient.RemoteAddr().String() + ": " + err.Error()
                } else {
                    chnLogging <- "Client " + conClient.RemoteAddr().String() + " disconnected"
                }
                chnCloseConn <- true
                return
            }

            tmTimeout.Stop()
            tmTimeout.Reset(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)

            chnData <- netData
        }
    }(chnDataFromClient)

    for {
        select {
        case <-chnCloseConn:
            chnLogging <- "Connection listener exiting for " + conClient.RemoteAddr().String()
            return
        case <-tmTimeout.C:
            chnLogging <- "Connection Timeout for " + conClient.RemoteAddr().String()
            return
        case strNetData := <-chnDataFromClient:

            var strctNetEncrypted stctNetEncrypted
            err := json.Unmarshal([]byte(strNetData), &strctNetEncrypted)
            CheckErr(err)

            switch strctNetEncrypted.IntMode {
            case 1:

                keyPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
                CheckErr(err)

                btServerPrivateKey, err := json.Marshal(keyPrivateKey)
                CheckErr(err)

                strctClientPubKeys.SetClientPubkey(strctNetEncrypted.BtPubKey, btServerPrivateKey)

                defer strctClientPubKeys.DelClientPubkey(strctNetEncrypted.BtPubKey)

                strctConnections.SetConnection(strctNetEncrypted.BtPubKey, conClient)

                defer strctConnections.DelConnection(strctNetEncrypted.BtPubKey)

                strctNetResponse := CreateStctNetEncryptedToClient("", strctNetEncrypted.BtPubKey, 2)
                if strctNetResponse.BtPubKey == nil ||
                    strctNetResponse.BtRemotePubKey == nil {
                    chnLogging <- "Error generating stage two response struct"
                    chnCloseConn <- true
                    return
                }

                btJSON, err := json.Marshal(strctNetResponse)
                CheckErr(err)
                chnLogging <- "Sending stage 2 negotation response"
                conClient.Write(btJSON)
                conClient.Write([]byte("\n"))

            case 2:

                chnLogging <- "WARNING: Received mode 2 network communication even though I shouldn't have"

            case 3:

                chnLogging <- "Received stage 3 negotiation response"

                strMessage, err := strctNetEncrypted.RSADecrypt()
                CheckErr(err)

                if len(strMessage) != 32 {
                    chnLogging <- "Unexpected shared key length; Aborting"
                    chnCloseConn <- true
                    return
                }

                strctClientPubKeys.SetClientSharedKey(strMessage, strctNetEncrypted.BtPubKey, conClient.RemoteAddr().String())

            case 4:

                strMessageDecrypted := DecryptPayloadFromClient(strctNetEncrypted)

                if strMessageDecrypted != "" {

                    if strings.ToLower(strMessageDecrypted) == "close" {
                        chnLogging <- "Client requests disconnection"
                        chnCloseConn <- true
                        return
                    }

                    // Keepalive message; disregard
                    if strMessageDecrypted == "PING" {
                        continue
                    }

                    btResult := InterpretClientCommand(strMessageDecrypted)

                    strctResponse := CreateStctNetEncryptedToClient(string(btResult), strctNetEncrypted.BtPubKey, 4)

                    btJSON, err := json.Marshal(strctResponse)
                    CheckErr(err)
                    conClient.Write(btJSON)
                    conClient.Write([]byte("\n"))

                } else {
                    chnLogging <- "Invalid command \"" + strMessageDecrypted + "\""
                }

            default:

                chnLogging <- "ERROR: Message received without mode set"
            }
        }
    }
}

应用程序将数据吞噬到缓冲的 reader,然后丢弃 reader 以及它可能已经缓冲到第一行之后的任何数据。

在连接的生命周期内保留缓冲 reader:

    rdr := bufio.NewReader(conClient)
    for {
        netData, err := rdr.ReadString('\n')
        ...

您可以通过消除 goroutine 来简化代码(并修复与缓冲区问题无关的其他问题)。使用读取截止时间来处理无响应的服务器。

func HandleClientConnection(conClient net.Conn) {
    defer conClient.Close()
    chnLogging <- "Connection from " + conClient.RemoteAddr().String()
    conClient.SetReadDeadline(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
    scanner := bufio.NewScanner(conClient)
    for scanner.Scan() {
        var strctNetEncrypted stctNetEncrypted
        err := json.Unmarshal(scanner.Bytes(), &strctNetEncrypted)
        CheckErr(err)
        switch strctNetEncrypted.IntMode {
             // Insert contents of switch statement from
             // question here with references to 
             // chnCloseConn removed.
        }
        conClient.SetReadDeadline(time.Minute * SERVER_INACTIVITY_TIMEOUT_MINUTES)
    }
    if scanner.Err() != nil {
        chnLogging <- "Error from client " + conClient.RemoteAddr().String() + ": " + err.Error()
    } else {
        chnLogging <- "Client " + conClient.RemoteAddr().String() + " disconnected"
    }
}