添加 goroutine 并发以提高代码效率
Adding goroutine concurrency to make code efficient
func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
// data massaging
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil{
panic(err)
}
fmt.Printf("POST : %s\n", status)
return response
}
func callback(rdb *redis.Client) {
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
}
// more work
sendCallback(status, sender, recipient, gatewayMessageId)
}
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
callback(rdb)
}
我知道上面的代码有缺陷,因为我正在等待处理“数据”。但是,我希望它是非阻塞的,并且一旦数据出现,我就希望对其进行处理。我一直在阅读关于 goroutines 的教程,但我无法理解它。
编辑
根据@torek 的解释,如果我将无限循环从回调函数中取出,在主函数中执行,并让回调专注于它的部分,那么 goroutine 应该如何工作?
func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
// data massaging
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil{
panic(err)
}
fmt.Printf("POST : %s\n", status)
return response
}
func callback(data string) {
parsedData := make(map[string]interface{})
err := json.Unmarshal([]byte(data), &parsedData)
if err != nil {
panic(err)
}
sender := parsedData["sender"].(string)
recipient := parsedData["recipient"].(string)
gatewayMessageId := parsedData["gateway_message_id"].(string)
sendCallback("sent", sender, recipient, gatewayMessageId)
sendCallback("delivered", sender, recipient, gatewayMessageId)
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
}
go callback(data)
}
}
我建议工作人员池解决方案。工作池应该有助于控制 CPU 和内存使用。由于上下文切换开销,大量进行 CPU 密集操作的 goroutines 并不是最佳的。但最重要的好处是可以控制 http 客户端。您的代码为从 redis 收到的每条消息创建一个 goroutine。然后这些 goroutines 发出 HTTP 请求。如果一段时间内从redis接收到的消息数高于目标服务所能处理的HTTP请求数,目标服务就会崩溃。或者,如果由于大量 goroutine 而达到内存限制,应用程序可能会崩溃。您可以设置 MaxConnsPerHost
来防止不受控制的连接创建,但不会阻止新 goroutine 的创建。在我的提议中,应用程序通过利用通道来调整处理速度以适应目标服务的能力。如果您发现目标服务可以处理更多请求并且您有空闲的 CPU 能力,则可以增加工作人员的数量。
type message struct {
Sender string `json:"sender"`
Recipient string `json:"recipient"`
GatewayMessageID string `json:"gateway_message_id"`
}
func sendCallback(status string, m message) *http.Response {
// data massaging
...
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil {
log.Fatal(err)
}
fmt.Printf("POST : %s\n", status)
return response
}
func worker(messages chan []byte) {
for rawMessage := range messages {
m := message{}
err := json.Unmarshal(rawMessage, &m)
if err != nil {
log.Fatal(err)
}
sendCallback("sent", m)
sendCallback("delivered", m)
}
}
const numberOfWorkers = 10
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
messages := make(chan []byte, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
go worker(messages)
}
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
} else if err != nil {
log.Fatal(err)
}
messages <- []byte(data)
}
}
func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
// data massaging
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil{
panic(err)
}
fmt.Printf("POST : %s\n", status)
return response
}
func callback(rdb *redis.Client) {
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
}
// more work
sendCallback(status, sender, recipient, gatewayMessageId)
}
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
callback(rdb)
}
我知道上面的代码有缺陷,因为我正在等待处理“数据”。但是,我希望它是非阻塞的,并且一旦数据出现,我就希望对其进行处理。我一直在阅读关于 goroutines 的教程,但我无法理解它。
编辑
根据@torek 的解释,如果我将无限循环从回调函数中取出,在主函数中执行,并让回调专注于它的部分,那么 goroutine 应该如何工作?
func sendCallback(status string, sender string, recipient string, gatewayMessageId string) *http.Response{
// data massaging
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil{
panic(err)
}
fmt.Printf("POST : %s\n", status)
return response
}
func callback(data string) {
parsedData := make(map[string]interface{})
err := json.Unmarshal([]byte(data), &parsedData)
if err != nil {
panic(err)
}
sender := parsedData["sender"].(string)
recipient := parsedData["recipient"].(string)
gatewayMessageId := parsedData["gateway_message_id"].(string)
sendCallback("sent", sender, recipient, gatewayMessageId)
sendCallback("delivered", sender, recipient, gatewayMessageId)
}
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
}
go callback(data)
}
}
我建议工作人员池解决方案。工作池应该有助于控制 CPU 和内存使用。由于上下文切换开销,大量进行 CPU 密集操作的 goroutines 并不是最佳的。但最重要的好处是可以控制 http 客户端。您的代码为从 redis 收到的每条消息创建一个 goroutine。然后这些 goroutines 发出 HTTP 请求。如果一段时间内从redis接收到的消息数高于目标服务所能处理的HTTP请求数,目标服务就会崩溃。或者,如果由于大量 goroutine 而达到内存限制,应用程序可能会崩溃。您可以设置 MaxConnsPerHost
来防止不受控制的连接创建,但不会阻止新 goroutine 的创建。在我的提议中,应用程序通过利用通道来调整处理速度以适应目标服务的能力。如果您发现目标服务可以处理更多请求并且您有空闲的 CPU 能力,则可以增加工作人员的数量。
type message struct {
Sender string `json:"sender"`
Recipient string `json:"recipient"`
GatewayMessageID string `json:"gateway_message_id"`
}
func sendCallback(status string, m message) *http.Response {
// data massaging
...
response, err := http.Post(postURL, "application/json", responseBody)
if err != nil {
log.Fatal(err)
}
fmt.Printf("POST : %s\n", status)
return response
}
func worker(messages chan []byte) {
for rawMessage := range messages {
m := message{}
err := json.Unmarshal(rawMessage, &m)
if err != nil {
log.Fatal(err)
}
sendCallback("sent", m)
sendCallback("delivered", m)
}
}
const numberOfWorkers = 10
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "",
DB: 0,
})
messages := make(chan []byte, numberOfWorkers)
for i := 0; i < numberOfWorkers; i++ {
go worker(messages)
}
for {
data, err := rdb.RPop(ctx, "callback").Result()
if err == redis.Nil {
fmt.Println("Sleeping")
time.Sleep(2 * time.Second) // sleep for 2s
continue
} else if err != nil {
log.Fatal(err)
}
messages <- []byte(data)
}
}