在 select 循环中重置 timer.NewTimer

Go reset a timer.NewTimer within select loop

我有一个场景,我正在处理通道上的事件,其中一个事件是需要在特定时间范围内发生的心跳。不是心跳的事件将继续消耗计时器,但是无论何时收到心跳,我都想重置计时器。最明显的方法是使用 time.NewTimer.

例如:

func main() {
    to := time.NewTimer(3200 * time.Millisecond)
    for {
        select {
        case event, ok := <-c:
            if !ok {
                return
            } else if event.Msg == "heartbeat" {
                to.Reset(3200 * time.Millisecond)               
            }
        case remediate := <-to.C:
            fmt.Println("do some stuff ...")
            return
        }
    }
}

请注意,time.Ticker 在这里不起作用,因为只有在未收到检测信号时才应触发修复,而不是每次都触发。

上面的解决方案在我试过的少数低容量测试中有效,但是我遇到了一个 Github 问题,表明 resetting a Timer which has not fired is a no-no。此外,文档指出:

Reset should be invoked only on stopped or expired timers with drained channels. If a program has already received a value from t.C, the timer is known to have expired and the channel drained, so t.Reset can be used directly. If a program has not yet received a value from t.C, however, the timer must be stopped and—if Stop reports that the timer expired before being stopped—the channel explicitly drained:

if !t.Stop() {
    <-t.C
}
t.Reset(d)

这让我停顿了一下,因为它似乎准确地描述了我正在尝试做的事情。每当接收到心跳时,我都会在它被触发之前重置 Timer 。我对 Go 还没有足够的经验来消化整个 post,但看起来我可能正走在一条危险的道路上。

我想到的另一种解决方案是在心跳发生时将 Timer 简单地替换为新的,例如:

else if event.Msg == "heartbeat" {
                to = time.NewTimer(3200 * time.Millisecond)               
            }

起初我担心重新绑定 to = time.NewTimer(3200 * time.Millisecond) 在 select:

中不可见

For all the cases in the statement, the channel operands of receive operations and the channel and right-hand-side expressions of send statements are evaluated exactly once, in source order, upon entering the "select" statement. The result is a set of channels to receive from or send to, and the corresponding values to send.

但在这种特殊情况下,由于我们处于循环中,我希望在每次迭代时我们重新输入 select,因此新绑定应该是可见的。这是一个公平的假设吗?

我意识到那里有类似的问题,我已经尝试阅读相关的 posts/documentation,但我是 Go 的新手,只是想确保我在这里理解正确。

所以我的问题是:

附录


进一步阅读后,问题中概述的大多数陷阱都描述了计时器已经触发(将结果放在通道上)的场景,并且在触发之后一些其他进程尝试重置它。对于这种狭窄的情况,我理解需要使用 !t.Stop() 进行测试,因为 Stop 的错误 return 表示计时器已经触发,因此必须在调用 Reset 之前耗尽。

我仍然不明白的是,为什么在 t.Reset() 之前调用 t.Stop(),而 Timer 尚未触发。据我所知,None 个例子涉及到这个问题。

What I still do not understand, is why it is necessary to call t.Stop() prior to t.Reset(), when the Timer has yet to fire.

“当计时器尚未触发时”位在这里很关键。计时器在一个单独的 go 例程(runtime 的一部分)中触发,这可能随时发生。您无法知道在您调用 to.Reset(3200 * time.Millisecond) 时计时器是否已触发(它甚至可能在该函数处于 运行ning 时触发!)。

这里有一个例子可以证明这一点,它与您正在尝试的有点相似(基于 this):


func main() {
    eventC := make(chan struct{}, 1)
    go keepaliveLoop(eventC )

    // Reset the timer 1000 (approx) times; once every millisecond (approx)
    // This should prevent the timer from firing (because that only happens after 2 ms)
    for i := 0; i < 1000; i++ {
        time.Sleep(time.Millisecond)
        // Don't block if there is already a reset request
        select {
        case eventC <- struct{}{}:
        default:
        }
    }
}

func keepaliveLoop(eventC chan struct{}) {
    to := time.NewTimer(2 * time.Millisecond)

    for {
        select {
        case <-eventC: 
            //if event.Msg == "heartbeat"...
            time.Sleep(3 * time.Millisecond) // Simulate reset work (delay could be partly dur to whatever is triggering the
            to.Reset(2 * time.Millisecond)
        case <-to.C:
            panic("this should never happen")
        }
    }
}

playground 中尝试。

由于 time.Sleep(3 * time.Millisecond),这可能显得人为,但这只是为了始终如一地证明问题。您的代码可能在 99.9% 的时间内都有效,但事件通道和计时器通道总是有可能在 select 变为 运行 之前触发(其中 random case 将 运行) 或者 while case event, ok := <-c: 块中的代码是 运行ning(包括 while Reset() is in progress)。发生这种情况的结果是意外调用 remediate 代码(这可能不是什么大问题)。

幸运的是解决这个问题相对容易(遵循 documentation 中的建议):

time.Sleep(3 * time.Millisecond) // Simulate reset work (delay could be partly dur to whatever is triggering the
if !to.Stop() {
    <-to.C
}
to.Reset(2 * time.Millisecond)

the playground 试试这个。

这是有效的,因为 to.Stop returns "true if the call stops the timer, false if the timer has already expired or been stopped". Note that things get a more complicated if the timer is used in multiple go-routines“这不能与来自定时器通道的其他接收或对定时器停止方法的其他调用同时完成”,但在您的使用中情况并非如此-案例.

Is my use of timer.Reset() unsafe, or are the cases mentioned in the Github issue highlighting other problems which are not applicable here?

是的 - 这是不安全的。然而,影响相当低。事件到达和计时器触发需要几乎同时发生,在这种情况下,运行 宁 remediate 代码可能不是一个大问题。请注意,修复非常简单(根据文档)

If it is unsafe, is my second proposed solution acceptable (rebinding the timer on each iteration).

您提出的第二个解决方案也有效(但请注意,垃圾收集器在触发或停止之前无法释放计时器,如果您快速创建计时器,这可能会导致问题)。

注:回复@JotaSantos的建议

Another thing that could be done is to add a select when draining <-to.C (on the Stop "if") with a default clause. That would prevent the pause.

请参阅 this comment 了解为什么这可能不是一个好方法的详细信息(在您的情况下也没有必要)。

我遇到过类似的问题。在阅读了很多信息之后,我想出了一个解决方案,大致如下:

package main

import (
    "fmt"
    "time"
)

func main() {
    const timeout = 2 * time.Second

    // Prepare a timer that is stopped and ready to be reset.
    // Stop will never return false, because an hour is too long
    // for timer to fire. Thus there's no need to drain timer.C.
    timer := time.NewTimer(timeout)
    timer.Stop()

    // Make sure to stop the timer when we return.
    defer timer.Stop()

    // This variable is needed because we need to track if we can safely reset the timer
    // in a loop. Calling timer.Stop() will return false on every iteration, but we can only
    // drain the timer.C once, otherwise it will deadlock.
    var timerSet bool

    c := make(chan time.Time)

    // Simulate events that come in every second
    // and every 5th event delays so that timer can fire.
    go func() {
        var i int
        ticker := time.NewTicker(1 * time.Second)
        defer ticker.Stop()

        for t := range ticker.C {
            i++
            if i%5 == 0 {
                fmt.Println("Sleeping")
                time.Sleep(3 * time.Second)
            }

            c <- t

            if i == 20 {
                break
            }
        }
        close(c)
    }()

    for {
        select {
        case t, ok := <-c:
            if !ok {
                fmt.Println("Closed channel")
                return
            }
            fmt.Println("Got event", t, timerSet)

            // We got an event, and timer was already set.
            // We need to stop the timer and drain the channel if needed,
            // so that we can safely reset it later.
            if timerSet {
                if !timer.Stop() {
                    <-timer.C
                }
                timerSet = false
            }

            // If timer was not set, or it was stopped before, it's safe to reset it.
            if !timerSet {
                timerSet = true
                timer.Reset(timeout)
            }
        case remediate := <-timer.C:
            fmt.Println("Timeout", remediate)
            // It's important to store that timer is not set anymore.
            timerSet = false
        }
    }
}

Link 去游乐场:https://play.golang.org/p/0QlujZngEGg