如何在没有比赛的情况下延长自动收报机持续时间?

How to extend the ticker duration without a race?

我正在尝试实现一个 keepAlive 机制。问题是我不知道如何在没有比赛的情况下替换保持活动代码 (conn.keepAlive),因为 keepAlive() 方法总是从代码中读取。

//errors not handled for brevity
const interval = 10 * time.Second

type conn struct {
    keepAlive time.Ticker
    conn      net.Conn
    mux       sync.Mutex
}

// replace replaces the underlying connection
func (cn conn) replace(newcn net.Conn) {
    cn.mux.Lock()
    cn.conn = newcn
    // reset the ticker
    cn.keepAlive.Stop
    cn.keepAlive = time.NewTicker(interval)
    cn.mux.Unlock()
}

func (cn conn) keepAlive() {
    for {
        <-cn.keepAlive.C
        cn.mux.Lock()
        cn.conn.Write([]byte("ping"))
        var msg []byte
        cn.conn.Read(msg)
        if string(msg) != "pong" {
            // do some mean stuff
        }
        cn.keepAlive = time.NewTicker(interval)
        cn.mux.Unlock()
    }
}

我最终得到了下面的代码。我对它的外观不太满意,但它确实有效。基本上我将多路复用器包裹在一个通道中,这样我就可以在上面做一个select。

const interval = 10 * time.Second

type conn struct {
    keepAlive time.Ticker
    conn      *net.Conn
    mux       sync.Mutex
}

// replace replaces the underlying connection
func (cn conn) replace(newcn *net.Conn) {
    cn.mux.Lock()
    cn.conn = newcn
    // reset the ticker
    cn.keepAlive.Stop
    cn.keepAlive = time.NewTicker(interval)
    cn.mux.Unlock()
}

func (cn conn) keepAlive() {
    lockerFn := func() <-chan struct{} {
        cn.mux.Lock()
        ch = make(chan struct{})
        go func() {
            ch <- struct{}{}
        }()
        return ch
    }
    for {
        locker := lockerFn()
        select {
        case <-cn.keepAlive.C:
            // unlock the locker otherwise we
            // get stuck
            go func() {
                <-locker
                cn.mux.Unlock()
            }()
        case <-locker:
            cn.conn.Write([]byte("ping"))
            var msg []byte
            cn.conn.Read(msg)
            cn.keepAlive = time.NewTicker(interval)
            cn.mux.Unlock()
        }
    }
}

一种更简洁地实现这一点的方法是使用通道作为同步机制,而不是互斥体:

type conn struct {
    sync.Mutex
    conn        net.Conn
    replaceConn chan net.Conn
}

// replace replaces the underlying connection
func (cn *conn) replace(newcn net.Conn) {
    cn.replaceConn <- newcn
}

func (cn *conn) keepAlive() {
    t := time.NewTicker(interval)
    msg := make([]byte, 10)

    for {
        select {
        case <-t.C:
        case newConn := <-cn.replaceConn:
            cn.Lock()
            cn.conn = newConn
            cn.Unlock()
            continue
        }

        cn.Lock()
        _ = msg
        // do keepalive
        cn.Unlock()
    }
}