Golang:节流(时间延迟)功能在 goroutine 中不起作用(在主线程中工作正常)

Golang: throttle (time delay) function is not working in goroutine (works fine in main thread)

所以我正在编写一个实用程序来查询工作中的 API,他们每 10 秒限制到 20 个调用。很简单,我会将我的通话时间限制在自上次通话后至少 0.5 秒。在我尝试使用 goroutine 之前,我的 Throttle 实用程序运行良好。

现在我正在使用 struct/method 组合:

func (c *CTKAPI) Throttle() {
if c.Debug{fmt.Println("\t\t\tEntering Throttle()")}
for { //in case something else makes a call while we're sleeping, we need to re-check
    if t := time.Now().Sub(c.LastCallTime); t < c.ThrottleTime {
        if c.Debug{fmt.Printf("\t\t\tThrottle: Sleeping %v\n", c.ThrottleTime - t)}
        time.Sleep(c.ThrottleTime - t)
    } else {
        if c.Debug{fmt.Println("\t\t\tThrottle: Released.")}
        break
    }
}
c.LastCallTime = time.Now()
if c.Debug{fmt.Println("\t\t\tExiting Throttle()")}

}

然后我在每个 goroutine 中的每次调用之前调用 whatever.Throttle() 以确保我在启动下一个调用之前至少等待了半秒。

但这似乎不可靠,并且会产生不可预测的结果。是否有更优雅的方式来限制并发请求?

-迈克

因为您引入了数据竞争,多个例程正在访问/更改 c.LastCallTime。

您可以使用 time.Tick 或将 c.LastCallTime 设为 int64 (c.LastCallTime = time.Now().Unix()) 并使用 atomic.LoadInt64/StoreInt64 进行检查。

回答我自己的问题,因为我昨晚很晚偶然发现了一些解决方案。 对我来说最简单的答案是使用 sync.Mutex 变量来锁定和解锁油门功能,以确保我不会同时不小心击中它。另一个选择是将我的节流服务移动到它自己的 goroutine 函数中(从而消除并发调用)并与通道通信 throttled/OK,但对于这个应用程序,Mutex 是一个更干净的解决方案。 以下是寻找类似解决方案的工作代码的简化版本:

package main

import (
    "fmt"
    "time"
    "sync"
)

type tStruct struct {
    delay time.Duration
    last time.Time
    lock sync.Mutex //this will be our locking variable
}

func (t *tStruct) createT() *tStruct {
    return &tStruct {
        delay: 500*time.Millisecond,
        last: time.Now(),
    }
}

func (t *tStruct) throttle(th int) {
    //we lock our function, and any other routine calling this function will block.
    t.lock.Lock()
    //and we'll defer an unlock, so when we exit the throttle, we'll be ready for another call.
    defer t.lock.Unlock()
    fmt.Printf("\tThread %v Entering Throttle Check.\n", th)
    defer fmt.Printf("\tThread %v Leaving Throttle Check.\n", th)
    for {
        p := time.Now().Sub(t.last)
        if p < t.delay {
            fmt.Printf("\tThread %v Sleeping %v.\n", th, t.delay-p)
            time.Sleep(t.delay-p)
        } else {
            fmt.Printf("\tThread %v No longer Throttled.\n", th)
            t.last = time.Now()
            break
        }
    }
}

func (t *tStruct) worker(rch <-chan string, sch chan<- string, th int) {
    fmt.Printf("Thread %v starting up.\n", th)
    defer fmt.Printf("Thread %v Dead.\n", th)
    sch <-"READY"
    for {
        r := <-rch
        fmt.Printf("Thread %v received %v\n", th, r)
        switch r {
            case "STOP":
                fmt.Printf("Thread %v returning.\n", th)
                sch <-"QUITTING"
                return
            default:
                fmt.Printf("Thread %v processing %v.\n", th, r)
                t.throttle(th)
                fmt.Printf("Thread %v done with %v.\n", th, r)
                sch <-"OK"
        }
    }
}

func main() {
    ts := tStruct{}
    ts.delay = 500*time.Millisecond
    ts.last = time.Now()
    sch := make(chan string)
    rch := make(chan string)
    tC := 3
    tA := 0

    fmt.Println("Starting Threads")
    for i:=1; i<(tC+1); i++ {
        go ts.worker(sch, rch, i)
        r := <-rch
        if r=="READY" {
            tA++
        } else {
            fmt.Println("ERROR not READY")
        }
    }

    fmt.Println("Feeding All Threads")
    for i:=1; i<(tC+1); i++ {
        sch <- "WORK"
    }

    fmt.Println("Listening for threads")
    for tA > 0{
        r := <-rch
        switch r {
            case "QUITTING":
                tA--
                fmt.Println("main received QUITTING")
                continue
            case "OK":
                fmt.Println("main received OK")
                sch <-"STOP"
                continue
            default:
                fmt.Println("Shouldn't be here!!!")
        }
    }
}

实际上有一种更简单的方法可以做到这一点:create a time ticker

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    rateLimit := time.Tick(500 * time.Millisecond)
    <-rateLimit

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(i int) {
            <-rateLimit
            fmt.Println("Hello", i)
            wg.Done()
        }(i)
    }
    wg.Wait()
}

您的新代码更好。正如另一个答案中提到的,你参加了一场比赛。 Go 有一个内置的竞争检测器 go build -race。这是一个了不起的工具,并且可以通过良好的单元测试找到适合您的比赛。

我认为您最初的假设之一是有缺陷的。通过调整所有 API 呼叫的节奏,您可以消除任何突发的机会。在您的方案中,每个 API 调用都会受到延迟影响,即使它可能不需要。除非您确定每个 API 调用都会达到极限,否则有更好的方法。

启动一个 time.NewTicker 到 10 秒并将计数器初始化为 0。为每个 API 请求增加计数器。如果计数器达到 20,则 goroutine 会休眠直到计时器关闭。当计时器关闭时重置计数器并继续休眠 goroutines。

我一直想编写一个 API 速率限制器,所以我编写了它,您可以在这里看到它: https://github.com/tildeleb/limiter/blob/master/limiter.go

除了例子,它是未经测试的。任何反馈,请在 github.

上创建问题