Golang Gorilla Websocket 在 120 秒时停止接收信息

Golang Gorilla Websocket stops receiving information at 120 seconds

我目前正在尝试连接到 CEX.IO 比特币交易所的 websocket,但不仅与 CEX.IO 有问题,而且与其他人也有问题。我的所有连接都在 120 秒左右下降,这让我觉得存在一些 TTL 问题。主包中的 Process() goroutine 最终只是挂起并等待来自 readLoop 的数据,而 readLoop 只是停止接收数据。我在代码中包含了一些只读 API 键,因此您可以根据需要进行测试。

package main

import (
  "fmt"
  "bitbucket.org/tradedefender/cryptocurrency/exchange-connector/cexio"
  "github.com/shopspring/decimal"
  "encoding/json"
  "time"
)

type OrderBook struct {
  Asks []Ask
  Bids []Bid
}

type Ask struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

type Bid struct {
  Rate    decimal.Decimal
  Amount  decimal.Decimal
}

func main() {
  cexioConn := new(cexio.Connection)

  err := cexioConn.Connect()
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  err = cexioConn.Authenticate("TLwYkktLf7Im6nqSKt6UO1IrU", "9ImOJcR7Qj3LMIyPCzky0D7WE")
  if err != nil {
    fmt.Errorf("error: %s", err.Error())
  }

  readChannel := make(chan cexio.IntraAppMessage, 25)

  go cexioConn.ReadLoop(readChannel)

  processor := Processor{
    WatchPairs: [][2]string{
      [2]string{
        "BTC", "USD",
      },
    },
    conn: cexioConn,
  }

  go processor.Process(readChannel)

  // LOL
  for {
    continue
  }

}

type Processor struct {
  WatchPairs [][2]string
  conn *cexio.Connection
}

func (p *Processor) Process(ch <-chan cexio.IntraAppMessage) {

  p.conn.SubscribeToOrderBook(p.WatchPairs[0])

  pingTimer := time.Now().Unix()
  for {

    fmt.Printf("(%v)\n", time.Now().Unix())

    if (time.Now().Unix() - pingTimer) >= 10 {
      fmt.Println("sending ping")
      p.conn.SendPing()
      pingTimer = time.Now().Unix()
    }

    readMsg := <- ch
    output, _ := json.Marshal(readMsg.SocketMessage)
    fmt.Println(string(output))

    if readMsg.SocketMessage.Event == "ping" {
      fmt.Println("sending pong")
      p.conn.SendPong()
      pingTimer = time.Now().Unix()
    }

  }
}

下面是 cexio websocket 的连接器。这是他们 API 的 link:https://cex.io/websocket-api

package cexio

import (
  "github.com/gorilla/websocket"
  //"github.com/shopspring/decimal"
  "github.com/satori/go.uuid"
  "encoding/hex"
  "encoding/json"
  "crypto/hmac"
  "crypto/sha256"
  "bytes"
  "strconv"
  "time"
  "fmt"
)

const Url = "wss://ws.cex.io/ws/"

type Connection struct {
  conn *websocket.Conn
}

type IntraAppMessage struct {
  SocketMessage   GenericMessage
  ProgramMessage  ProgramMessage
}

type GenericMessage struct {
  Event   string      `json:"e"`
  Data    interface{} `json:"data"`
  Auth    AuthData    `json:"auth,omitempty"`
  Ok      string      `json:"ok,omitempty"`
  Oid     string      `json:"oid,omitempty"`
  Time    int64       `json:"time,omitempty"`
}

type ProgramMessage struct {
  Error   string
}

type AuthData struct {
  Key       string  `json:"key"`
  Signature string  `json:"signature"`
  Timestamp int64   `json:"timestamp"`
}

type OrderBookSubscribeData struct {
  Pair      [2]string   `json:"pair"`
  Subscribe bool        `json:"subscribe"`
  Depth     int         `json:"depth"`
}

func (c *Connection) SendPong() error {

  pongMsg := GenericMessage{
    Event: "pong",
  }

  err := c.conn.WriteJSON(pongMsg)
  if err != nil {
    return nil
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PongMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) SendPing() error {

  pingMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(pingMsg)
  if err != nil {
    return err
  }

  deadline := time.Now().Add(15*time.Second)

  err = c.conn.WriteControl(websocket.PingMessage, nil, deadline)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) Connect() error {
  dialer := *websocket.DefaultDialer
  wsConn, _, err := dialer.Dial(Url, nil)
  if err != nil {
    return err
  }

  c.conn = wsConn
  //c.conn.SetPingHandler(c.HandlePing)

  for {

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "connected" {
      c.Disconnect()
      return err
    } else {
      break
    }

  }

  return nil
}

func (c *Connection) Disconnect() error {
  return c.conn.Close()
}

func (c *Connection) ReadLoop(ch chan<- IntraAppMessage) {
  for {

    fmt.Println("starting new read")

    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      ch <- IntraAppMessage{
        ProgramMessage: ProgramMessage{
          Error: err.Error(),
        },
      }
      continue
    }

    ch <- IntraAppMessage{
      SocketMessage: m,
    }

  }
}

func CreateSignature(timestamp int64, key, secret string) string {
  secretBytes := []byte(secret)
  h := hmac.New(sha256.New, secretBytes)

  var buffer bytes.Buffer
  buffer.WriteString(strconv.FormatInt(timestamp, 10))
  buffer.WriteString(key)

  h.Write(buffer.Bytes())

  return hex.EncodeToString(h.Sum(nil))
}

func (c *Connection) Authenticate(key, secret string) error {

  timestamp := time.Now().Unix()
  signature := CreateSignature(timestamp, key, secret)

  var authMsg GenericMessage
  authMsg.Event = "auth"
  authMsg.Auth = AuthData{
    Key: key,
    Signature: signature,
    Timestamp: timestamp,
  }

  err := c.conn.WriteJSON(authMsg)
  if err != nil {
    return err
  }

  for {
    _, msgBytes, err := c.conn.ReadMessage()
    if err != nil {
      c.Disconnect()
      return err
    }

    fmt.Println(string(msgBytes))

    var m GenericMessage
    err = json.Unmarshal(msgBytes, &m)
    if err != nil {
      c.Disconnect()
      return err
    }

    if m.Event != "auth" && m.Ok != "ok" {
      c.Disconnect()
      return err
    } else {
      break
    }
  }

  return nil

}

func (c *Connection) SubscribeToOrderBook(pair [2]string) error {

  sendMsg := GenericMessage{
    Event: "order-book-subscribe",
    Data: OrderBookSubscribeData{
      Pair: pair,
      Subscribe: true,
      Depth: 0,
    },
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}

func (c *Connection) GetBalance() error {

  sendMsg := GenericMessage{
    Event: "get-balance",
    Oid: uuid.NewV4().String(),
  }

  err := c.conn.WriteJSON(sendMsg)
  if err != nil {
    return err
  }

  return nil

}

解决方案是删除

for { 
  continue 
}

主函数结束