从 websocket 客户端获取通道值

getting channel value from a webscoket client

我是 运行 一个 websocket 客户端,我想将响应从客户端传递到我可以在我的主文件中使用的通道。目前,频道只是 returns 一次 nil 值,然后就没有别的了。将值传递给频道时,我似乎遇到了问题。有什么帮助吗?这是我到目前为止所做的

package main

import (
    "context"
    "fmt"
    "kraken_client/stored_data"
    "kraken_client/ws_client"
    "os"
    "os/signal"
    "strings"
    "sync"
    "syscall"
)

func main() {
    // check if in production or testing mode & find base curency
    var testing bool = true
    args := os.Args
    isTesting(args, &testing, &stored_data.Base_currency)

    // go routine handler
    comms := make(chan os.Signal, 1)
    signal.Notify(comms, os.Interrupt, syscall.SIGTERM)
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    var wg sync.WaitGroup

    // set ohlc interval and pairs
    OHLCinterval := 5
    pairs := []string{"BTC/" + stored_data.Base_currency, "EOS/" + stored_data.Base_currency}

    // create ws connections
    pubSocket, err := ws_client.ConnectToServer("public", testing)
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }

    // listen to websocket connections
    ch := make(chan interface{})
    wg.Add(1)
    go pubSocket.PubListen(ctx, &wg, ch, testing)

    // subscribe to a stream
    pubSocket.SubscribeToOHLC(pairs, OHLCinterval)

    go func() {
        for c := range ch {
            fmt.Println(c)
        }
    }()

    <-comms
    cancel()
    wg.Wait()
    defer close(ch)
}

PubListen 函数的工作原理如下

func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, ch chan interface{}, testing bool) {
    defer wg.Done()
    defer socket.Close()

    var res interface{}

    socket.OnTextMessage = func(message string, socket Socket) {
        //log.Println(message)
        res = pubJsonDecoder(message, testing) // this function decodes the message and returns an interface
        log.Println(res) // this is printing the correctly decoded value.

    }

    ch <- res
    log.Println(res) // does not print a value
    log.Println(ch) // does not print a value

    <-ctx.Done()
    log.Println("closing public socket")
    return
}

我做错了什么?

问题中的代码在 OnTextMessage 函数设置 res 之前从 PubListen 执行语句 ch <- res 一次。

要在每条消息上向 ch 发送值,请将行 ch <- res 移至 OnTextMessage 函数。每条消息都会调用一次该函数。

func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, ch chan interface{}, testing bool) {
    defer wg.Done()
    defer socket.Close()
    socket.OnTextMessage = func(message string, socket Socket) {
        res := pubJsonDecoder(message, testing)
        ch <- res
        log.Println(res)
    }
    <-ctx.Done()
    log.Println("closing public socket")
    return
}