添加 goroutine 并发以提高代码效率

Adding goroutine concurrency to make code efficient

func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
    // data massaging
    response, err := http.Post(postURL, "application/json", responseBody)
    if err != nil{
        panic(err)
    }
    fmt.Printf("POST : %s\n", status)
    return response
}

func callback(rdb *redis.Client) {
    for {
        data, err := rdb.RPop(ctx, "callback").Result()
        if err == redis.Nil {
            fmt.Println("Sleeping")
            time.Sleep(2 * time.Second) // sleep for 2s
            continue
        }

        // more work

       sendCallback(status, sender, recipient, gatewayMessageId)
    }

}



func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "127.0.0.1:6379",
        Password: "",
        DB:       0,
    })
    callback(rdb)
}

我知道上面的代码有缺陷,因为我正在等待处理“数据”。但是,我希望它是非阻塞的,并且一旦数据出现,我就希望对其进行处理。我一直在阅读关于 goroutines 的教程,但我无法理解它。

编辑

根据@torek 的解释,如果我将无限循环从回调函数中取出,在主函数中执行,并让回调专注于它的部分,那么 goroutine 应该如何工作?

func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
    // data massaging
    response, err := http.Post(postURL, "application/json", responseBody)
    if err != nil{
        panic(err)
    }
    fmt.Printf("POST : %s\n", status)
    return response
}

func callback(data string) {

    parsedData := make(map[string]interface{})
    err := json.Unmarshal([]byte(data), &parsedData)
    if err != nil {
        panic(err)
    }

    sender := parsedData["sender"].(string)
    recipient := parsedData["recipient"].(string)
    gatewayMessageId := parsedData["gateway_message_id"].(string)
  
    sendCallback("sent", sender, recipient, gatewayMessageId)
    sendCallback("delivered", sender, recipient, gatewayMessageId)
}

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "127.0.0.1:6379",
        Password: "",
        DB:       0,
    })
    for {
        data, err := rdb.RPop(ctx, "callback").Result()
        if err == redis.Nil {
            fmt.Println("Sleeping")
            time.Sleep(2 * time.Second) // sleep for 2s
            continue
        }
        go callback(data)
    }
}

我建议工作人员池解决方案。工作池应该有助于控制 CPU 和内存使用。由于上下文切换开销,大量进行 CPU 密集操作的 goroutines 并不是最佳的。但最重要的好处是可以控制 http 客户端。您的代码为从 redis 收到的每条消息创建一个 goroutine。然后这些 goroutines 发出 HTTP 请求。如果一段时间内从redis接收到的消息数高于目标服务所能处理的HTTP请求数,目标服务就会崩溃。或者,如果由于大量 goroutine 而达到内存限制,应用程序可能会崩溃。您可以设置 MaxConnsPerHost 来防止不受控制的连接创建,但不会阻止新 goroutine 的创建。在我的提议中,应用程序通过利用通道来调整处理速度以适应目标服务的能力。如果您发现目标服务可以处理更多请求并且您有空闲的 CPU 能力,则可以增加工作人员的数量。

type message struct {
    Sender           string `json:"sender"`
    Recipient        string `json:"recipient"`
    GatewayMessageID string `json:"gateway_message_id"`
}

func sendCallback(status string, m message) *http.Response {
    // data massaging
    ...
    response, err := http.Post(postURL, "application/json", responseBody)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("POST : %s\n", status)
    return response
}

func worker(messages chan []byte) {
    for rawMessage := range  messages {
        m := message{}
        err := json.Unmarshal(rawMessage, &m)
        if err != nil {
            log.Fatal(err)
        }
        sendCallback("sent", m)
        sendCallback("delivered", m)
    }
}

const numberOfWorkers = 10

func main() {
    rdb := redis.NewClient(&redis.Options{
        Addr:     "127.0.0.1:6379",
        Password: "",
        DB:       0,
    })
    messages := make(chan []byte, numberOfWorkers)
    for i := 0; i < numberOfWorkers; i++ {
        go worker(messages)
    }

    for {
        data, err := rdb.RPop(ctx, "callback").Result()
        if err == redis.Nil {
            fmt.Println("Sleeping")
            time.Sleep(2 * time.Second) // sleep for 2s
            continue
        } else if err != nil {
            log.Fatal(err)
        }
        messages <- []byte(data)
    }
}