Golang:节流(时间延迟)功能在 goroutine 中不起作用(在主线程中工作正常)
Golang: throttle (time delay) function is not working in goroutine (works fine in main thread)
所以我正在编写一个实用程序来查询工作中的 API,他们每 10 秒限制到 20 个调用。很简单,我会将我的通话时间限制在自上次通话后至少 0.5 秒。在我尝试使用 goroutine 之前,我的 Throttle 实用程序运行良好。
现在我正在使用 struct/method 组合:
func (c *CTKAPI) Throttle() {
if c.Debug{fmt.Println("\t\t\tEntering Throttle()")}
for { //in case something else makes a call while we're sleeping, we need to re-check
if t := time.Now().Sub(c.LastCallTime); t < c.ThrottleTime {
if c.Debug{fmt.Printf("\t\t\tThrottle: Sleeping %v\n", c.ThrottleTime - t)}
time.Sleep(c.ThrottleTime - t)
} else {
if c.Debug{fmt.Println("\t\t\tThrottle: Released.")}
break
}
}
c.LastCallTime = time.Now()
if c.Debug{fmt.Println("\t\t\tExiting Throttle()")}
}
然后我在每个 goroutine 中的每次调用之前调用 whatever.Throttle() 以确保我在启动下一个调用之前至少等待了半秒。
但这似乎不可靠,并且会产生不可预测的结果。是否有更优雅的方式来限制并发请求?
-迈克
因为您引入了数据竞争,多个例程正在访问/更改 c.LastCallTime。
您可以使用 time.Tick
或将 c.LastCallTime
设为 int64
(c.LastCallTime = time.Now().Unix()
) 并使用 atomic.LoadInt64/StoreInt64
进行检查。
回答我自己的问题,因为我昨晚很晚偶然发现了一些解决方案。
对我来说最简单的答案是使用 sync.Mutex 变量来锁定和解锁油门功能,以确保我不会同时不小心击中它。另一个选择是将我的节流服务移动到它自己的 goroutine 函数中(从而消除并发调用)并与通道通信 throttled/OK,但对于这个应用程序,Mutex 是一个更干净的解决方案。
以下是寻找类似解决方案的工作代码的简化版本:
package main
import (
"fmt"
"time"
"sync"
)
type tStruct struct {
delay time.Duration
last time.Time
lock sync.Mutex //this will be our locking variable
}
func (t *tStruct) createT() *tStruct {
return &tStruct {
delay: 500*time.Millisecond,
last: time.Now(),
}
}
func (t *tStruct) throttle(th int) {
//we lock our function, and any other routine calling this function will block.
t.lock.Lock()
//and we'll defer an unlock, so when we exit the throttle, we'll be ready for another call.
defer t.lock.Unlock()
fmt.Printf("\tThread %v Entering Throttle Check.\n", th)
defer fmt.Printf("\tThread %v Leaving Throttle Check.\n", th)
for {
p := time.Now().Sub(t.last)
if p < t.delay {
fmt.Printf("\tThread %v Sleeping %v.\n", th, t.delay-p)
time.Sleep(t.delay-p)
} else {
fmt.Printf("\tThread %v No longer Throttled.\n", th)
t.last = time.Now()
break
}
}
}
func (t *tStruct) worker(rch <-chan string, sch chan<- string, th int) {
fmt.Printf("Thread %v starting up.\n", th)
defer fmt.Printf("Thread %v Dead.\n", th)
sch <-"READY"
for {
r := <-rch
fmt.Printf("Thread %v received %v\n", th, r)
switch r {
case "STOP":
fmt.Printf("Thread %v returning.\n", th)
sch <-"QUITTING"
return
default:
fmt.Printf("Thread %v processing %v.\n", th, r)
t.throttle(th)
fmt.Printf("Thread %v done with %v.\n", th, r)
sch <-"OK"
}
}
}
func main() {
ts := tStruct{}
ts.delay = 500*time.Millisecond
ts.last = time.Now()
sch := make(chan string)
rch := make(chan string)
tC := 3
tA := 0
fmt.Println("Starting Threads")
for i:=1; i<(tC+1); i++ {
go ts.worker(sch, rch, i)
r := <-rch
if r=="READY" {
tA++
} else {
fmt.Println("ERROR not READY")
}
}
fmt.Println("Feeding All Threads")
for i:=1; i<(tC+1); i++ {
sch <- "WORK"
}
fmt.Println("Listening for threads")
for tA > 0{
r := <-rch
switch r {
case "QUITTING":
tA--
fmt.Println("main received QUITTING")
continue
case "OK":
fmt.Println("main received OK")
sch <-"STOP"
continue
default:
fmt.Println("Shouldn't be here!!!")
}
}
}
实际上有一种更简单的方法可以做到这一点:create a time ticker。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rateLimit := time.Tick(500 * time.Millisecond)
<-rateLimit
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
<-rateLimit
fmt.Println("Hello", i)
wg.Done()
}(i)
}
wg.Wait()
}
您的新代码更好。正如另一个答案中提到的,你参加了一场比赛。 Go 有一个内置的竞争检测器 go build -race
。这是一个了不起的工具,并且可以通过良好的单元测试找到适合您的比赛。
我认为您最初的假设之一是有缺陷的。通过调整所有 API 呼叫的节奏,您可以消除任何突发的机会。在您的方案中,每个 API 调用都会受到延迟影响,即使它可能不需要。除非您确定每个 API 调用都会达到极限,否则有更好的方法。
启动一个 time.NewTicker 到 10 秒并将计数器初始化为 0。为每个 API 请求增加计数器。如果计数器达到 20,则 goroutine 会休眠直到计时器关闭。当计时器关闭时重置计数器并继续休眠 goroutines。
我一直想编写一个 API 速率限制器,所以我编写了它,您可以在这里看到它:
https://github.com/tildeleb/limiter/blob/master/limiter.go
除了例子,它是未经测试的。任何反馈,请在 github.
上创建问题
所以我正在编写一个实用程序来查询工作中的 API,他们每 10 秒限制到 20 个调用。很简单,我会将我的通话时间限制在自上次通话后至少 0.5 秒。在我尝试使用 goroutine 之前,我的 Throttle 实用程序运行良好。
现在我正在使用 struct/method 组合:
func (c *CTKAPI) Throttle() {
if c.Debug{fmt.Println("\t\t\tEntering Throttle()")}
for { //in case something else makes a call while we're sleeping, we need to re-check
if t := time.Now().Sub(c.LastCallTime); t < c.ThrottleTime {
if c.Debug{fmt.Printf("\t\t\tThrottle: Sleeping %v\n", c.ThrottleTime - t)}
time.Sleep(c.ThrottleTime - t)
} else {
if c.Debug{fmt.Println("\t\t\tThrottle: Released.")}
break
}
}
c.LastCallTime = time.Now()
if c.Debug{fmt.Println("\t\t\tExiting Throttle()")}
}
然后我在每个 goroutine 中的每次调用之前调用 whatever.Throttle() 以确保我在启动下一个调用之前至少等待了半秒。
但这似乎不可靠,并且会产生不可预测的结果。是否有更优雅的方式来限制并发请求?
-迈克
因为您引入了数据竞争,多个例程正在访问/更改 c.LastCallTime。
您可以使用 time.Tick
或将 c.LastCallTime
设为 int64
(c.LastCallTime = time.Now().Unix()
) 并使用 atomic.LoadInt64/StoreInt64
进行检查。
回答我自己的问题,因为我昨晚很晚偶然发现了一些解决方案。 对我来说最简单的答案是使用 sync.Mutex 变量来锁定和解锁油门功能,以确保我不会同时不小心击中它。另一个选择是将我的节流服务移动到它自己的 goroutine 函数中(从而消除并发调用)并与通道通信 throttled/OK,但对于这个应用程序,Mutex 是一个更干净的解决方案。 以下是寻找类似解决方案的工作代码的简化版本:
package main
import (
"fmt"
"time"
"sync"
)
type tStruct struct {
delay time.Duration
last time.Time
lock sync.Mutex //this will be our locking variable
}
func (t *tStruct) createT() *tStruct {
return &tStruct {
delay: 500*time.Millisecond,
last: time.Now(),
}
}
func (t *tStruct) throttle(th int) {
//we lock our function, and any other routine calling this function will block.
t.lock.Lock()
//and we'll defer an unlock, so when we exit the throttle, we'll be ready for another call.
defer t.lock.Unlock()
fmt.Printf("\tThread %v Entering Throttle Check.\n", th)
defer fmt.Printf("\tThread %v Leaving Throttle Check.\n", th)
for {
p := time.Now().Sub(t.last)
if p < t.delay {
fmt.Printf("\tThread %v Sleeping %v.\n", th, t.delay-p)
time.Sleep(t.delay-p)
} else {
fmt.Printf("\tThread %v No longer Throttled.\n", th)
t.last = time.Now()
break
}
}
}
func (t *tStruct) worker(rch <-chan string, sch chan<- string, th int) {
fmt.Printf("Thread %v starting up.\n", th)
defer fmt.Printf("Thread %v Dead.\n", th)
sch <-"READY"
for {
r := <-rch
fmt.Printf("Thread %v received %v\n", th, r)
switch r {
case "STOP":
fmt.Printf("Thread %v returning.\n", th)
sch <-"QUITTING"
return
default:
fmt.Printf("Thread %v processing %v.\n", th, r)
t.throttle(th)
fmt.Printf("Thread %v done with %v.\n", th, r)
sch <-"OK"
}
}
}
func main() {
ts := tStruct{}
ts.delay = 500*time.Millisecond
ts.last = time.Now()
sch := make(chan string)
rch := make(chan string)
tC := 3
tA := 0
fmt.Println("Starting Threads")
for i:=1; i<(tC+1); i++ {
go ts.worker(sch, rch, i)
r := <-rch
if r=="READY" {
tA++
} else {
fmt.Println("ERROR not READY")
}
}
fmt.Println("Feeding All Threads")
for i:=1; i<(tC+1); i++ {
sch <- "WORK"
}
fmt.Println("Listening for threads")
for tA > 0{
r := <-rch
switch r {
case "QUITTING":
tA--
fmt.Println("main received QUITTING")
continue
case "OK":
fmt.Println("main received OK")
sch <-"STOP"
continue
default:
fmt.Println("Shouldn't be here!!!")
}
}
}
实际上有一种更简单的方法可以做到这一点:create a time ticker。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
rateLimit := time.Tick(500 * time.Millisecond)
<-rateLimit
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
<-rateLimit
fmt.Println("Hello", i)
wg.Done()
}(i)
}
wg.Wait()
}
您的新代码更好。正如另一个答案中提到的,你参加了一场比赛。 Go 有一个内置的竞争检测器 go build -race
。这是一个了不起的工具,并且可以通过良好的单元测试找到适合您的比赛。
我认为您最初的假设之一是有缺陷的。通过调整所有 API 呼叫的节奏,您可以消除任何突发的机会。在您的方案中,每个 API 调用都会受到延迟影响,即使它可能不需要。除非您确定每个 API 调用都会达到极限,否则有更好的方法。
启动一个 time.NewTicker 到 10 秒并将计数器初始化为 0。为每个 API 请求增加计数器。如果计数器达到 20,则 goroutine 会休眠直到计时器关闭。当计时器关闭时重置计数器并继续休眠 goroutines。
我一直想编写一个 API 速率限制器,所以我编写了它,您可以在这里看到它: https://github.com/tildeleb/limiter/blob/master/limiter.go
除了例子,它是未经测试的。任何反馈,请在 github.
上创建问题