Go:负 WaitGroup 计数器
Go: negative WaitGroup counter
我有点陌生,正在修改我在其他地方找到的代码以满足我的需要。正因为如此,我并不完全明白这里发生了什么,尽管我得到了大概的想法。
我是 运行 一些使用 go routines 的 websocket 客户端,但我遇到了导致程序崩溃的意外错误。当从 websocket 读取消息时出现错误(检查 readHandler 函数中的 conn.ReadMessage() 函数)时,我的程序似乎关闭了太多线程(如果这是错误的术语,请原谅)。关于如何解决此问题的任何想法?我真的很感激任何人花时间浏览它。提前致谢!
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
"strconv"
"encoding/json"
"log"
"bytes"
"compress/gzip"
"io/ioutil"
)
// Structs
type Ping struct {
Ping int64 `json:"ping"`
}
type Pong struct {
Pong int64 `json:"pong"`
}
type SubParams struct {
Sub string `json:"sub"`
ID string `json:"id"`
}
func InitSub(subType string, pair string, i int) []byte {
var idInt string = "id" + strconv.Itoa(i)
subStr := "market." + pair + "." + subType
sub := &SubParams{
Sub: subStr,
ID: idInt,
}
out, err := json.MarshalIndent(sub, "", " ")
if err != nil {
log.Println(err);
}
//log.Println(string(out))
return out
}
// main func
func main() {
var server string = "api.huobi.pro"
pairs := []string{"btcusdt", "ethusdt", "ltcusdt"}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for x, pair := range pairs {
wg.Add(1)
go control(server, "ws", pair, ctx, &wg, x+1)
}
<-comms
cancel()
wg.Wait()
}
func control(server string, path string, pair string, ctx context.Context, wg *sync.WaitGroup, i int) {
fmt.Printf("Started control for %s\n", server)
url := url.URL {
Scheme: "wss",
Host: server,
Path: path,
}
fmt.Println(url.String())
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
subscribe(conn, pair, i)
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<- ctx.Done()
localwg.Wait()
wg.Done()
return
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case <- ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println(err)
}
r, err := gzip.NewReader(bytes.NewReader(p))
if(err == nil) {
result, err := ioutil.ReadAll(r)
if(err != nil) {
fmt.Println(err)
}
d := string(result)
fmt.Println(d)
var ping Ping
json.Unmarshal([]byte(d), &ping)
if (ping.Ping > 0) {
str := Pong{Pong: ping.Ping}
msg, err := json.Marshal(str)
if (err == nil) {
fmt.Println(string(msg))
conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
}
}
}
}
}
func subscribe(conn *websocket.Conn, pair string, id int) {
sub := string(InitSub("trade.detail", pair, id))
err := conn.WriteMessage(websocket.TextMessage, []byte(sub))
if err != nil {
panic(err)
}
}
连接失败时跳出readHandler
循环:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println(err)
return // <--- add this line
}
如果没有 return,该函数将在一个紧密的循环中旋转读取错误,直到出现恐慌。
在 goroutine 的开头使用 defer wg.Done()
以确保 Done 只被调用一次。
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
fmt.Println(err)
return
}
...
同时更新 control
函数。
因为调用者不与readHander
并发执行任何代码,运行没有任何价值readHandler
是一个goroutine。从 readHandler
中删除所有对等待组的引用并直接调用该函数:将 go readHandler(ctx, conn, &localwg, server)
更改为 readHandler(ctx, conn, server)
.
还有更多问题,但这应该会让您走得更远。
我有点陌生,正在修改我在其他地方找到的代码以满足我的需要。正因为如此,我并不完全明白这里发生了什么,尽管我得到了大概的想法。
我是 运行 一些使用 go routines 的 websocket 客户端,但我遇到了导致程序崩溃的意外错误。当从 websocket 读取消息时出现错误(检查 readHandler 函数中的 conn.ReadMessage() 函数)时,我的程序似乎关闭了太多线程(如果这是错误的术语,请原谅)。关于如何解决此问题的任何想法?我真的很感激任何人花时间浏览它。提前致谢!
package main
import (
"context"
"fmt"
"os"
"time"
"os/signal"
"syscall"
"sync"
"net/url"
"github.com/gorilla/websocket"
"strconv"
"encoding/json"
"log"
"bytes"
"compress/gzip"
"io/ioutil"
)
// Structs
type Ping struct {
Ping int64 `json:"ping"`
}
type Pong struct {
Pong int64 `json:"pong"`
}
type SubParams struct {
Sub string `json:"sub"`
ID string `json:"id"`
}
func InitSub(subType string, pair string, i int) []byte {
var idInt string = "id" + strconv.Itoa(i)
subStr := "market." + pair + "." + subType
sub := &SubParams{
Sub: subStr,
ID: idInt,
}
out, err := json.MarshalIndent(sub, "", " ")
if err != nil {
log.Println(err);
}
//log.Println(string(out))
return out
}
// main func
func main() {
var server string = "api.huobi.pro"
pairs := []string{"btcusdt", "ethusdt", "ltcusdt"}
comms := make(chan os.Signal, 1)
signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
for x, pair := range pairs {
wg.Add(1)
go control(server, "ws", pair, ctx, &wg, x+1)
}
<-comms
cancel()
wg.Wait()
}
func control(server string, path string, pair string, ctx context.Context, wg *sync.WaitGroup, i int) {
fmt.Printf("Started control for %s\n", server)
url := url.URL {
Scheme: "wss",
Host: server,
Path: path,
}
fmt.Println(url.String())
conn, _, err := websocket.DefaultDialer.Dial(url.String(), nil)
if err != nil {
panic(err)
}
subscribe(conn, pair, i)
defer conn.Close()
var localwg sync.WaitGroup
localwg.Add(1)
go readHandler(ctx, conn, &localwg, server)
<- ctx.Done()
localwg.Wait()
wg.Done()
return
}
func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) {
for {
select {
case <- ctx.Done():
wg.Done()
return
default:
_, p, err := conn.ReadMessage()
if err != nil {
wg.Done()
fmt.Println(err)
}
r, err := gzip.NewReader(bytes.NewReader(p))
if(err == nil) {
result, err := ioutil.ReadAll(r)
if(err != nil) {
fmt.Println(err)
}
d := string(result)
fmt.Println(d)
var ping Ping
json.Unmarshal([]byte(d), &ping)
if (ping.Ping > 0) {
str := Pong{Pong: ping.Ping}
msg, err := json.Marshal(str)
if (err == nil) {
fmt.Println(string(msg))
conn.WriteMessage(websocket.TextMessage, []byte(msg))
}
}
}
}
}
}
func subscribe(conn *websocket.Conn, pair string, id int) {
sub := string(InitSub("trade.detail", pair, id))
err := conn.WriteMessage(websocket.TextMessage, []byte(sub))
if err != nil {
panic(err)
}
}
连接失败时跳出
readHandler
循环:_, p, err := conn.ReadMessage() if err != nil { wg.Done() fmt.Println(err) return // <--- add this line }
如果没有 return,该函数将在一个紧密的循环中旋转读取错误,直到出现恐慌。
在 goroutine 的开头使用
defer wg.Done()
以确保 Done 只被调用一次。func readHandler(ctx context.Context, conn *websocket.Conn, wg *sync.WaitGroup, server string) { defer wg.Done() for { select { case <-ctx.Done(): return default: _, p, err := conn.ReadMessage() if err != nil { fmt.Println(err) return } ...
同时更新
control
函数。因为调用者不与
readHander
并发执行任何代码,运行没有任何价值readHandler
是一个goroutine。从readHandler
中删除所有对等待组的引用并直接调用该函数:将go readHandler(ctx, conn, &localwg, server)
更改为readHandler(ctx, conn, server)
.
还有更多问题,但这应该会让您走得更远。