Go:通过通道传递函数
Go: Passing functions through channels
我正在尝试通过将调用的函数放入队列中以便稍后访问来限制它们的速率。下面是我创建的一部分请求,requestHandler 函数以特定速率处理每个请求。
我希望它接受具有不同类型参数的各种函数,因此是 interface{} 类型。
如何通过通道传递函数并成功调用它们?
type request struct {
function interface{}
channel chan interface{}
}
var requestQueue []request
func pushQueue(f interface{}, ch chan interface{}) {
req := request{
f,
ch,
}
//push
requestQueue = append(requestQueue, req)
}
func requestHandler() {
for {
if len(requestQueue) > 0 {
//pop
req := requestQueue[len(requestQueue)-1]
requestQueue = requestQueue[:len(requestQueue)-1]
req.channel <- req.function
}
<-time.After(1200 * time.Millisecond)
}
}
这是我正在尝试实现的示例(GetLeagueEntries(string, string) 和 GetSummonerName(int, int) 是函数):
ch := make(chan interface{})
pushQueue(l.GetLeagueEntries, ch)
pushQueue(l.GetSummonerName, ch)
leagues, _ := <-ch(string1, string2)
summoners, _ := <-ch(int1, int2)
我原以为使用某种信号量或工作池会更容易。这样一来,您可以做任何事情的工人数量有限。也可以有多个工作池。
您需要这些电话中的任何一个才能成为 concurrent/asynchronous 吗?如果没有,可以调用它们以便您可以配置睡眠(一个讨厌的 hack 头脑)。
尝试使用工作线程池或信号量而不是一系列函数。
首先,我会写成:
leagues := server.GetLeagueEntries()
summoners := server.GetSummoners()
并且,将速率限制放入服务器。使用 rate-limiting 个库之一。
但是可以用一个接口统一请求,用一个func类型允许闭包(如http.HandleFunc):
type Command interface {
Execute(server *Server)
}
type CommandFunc func(server *Server)
func (fn CommandFunc) Execute(server *Server) { fn(server) }
type GetLeagueEntries struct { Leagues []League }
func (entries *GetLeagueEntries) Execute(server *Server) {
// ...
}
func GetSummonerName(id int, result *string) CommandFunc {
return CommandFunc(func(server *Server){
*result = "hello"
})
}
get := GetLeagueEnties{}
requests <- &get
requests <- CommandFunc(func(server *Server){
// ... handle struff here
})
当然,这需要一些同步。
好的,这是代码:https://play.golang.org/p/XZvb_4BaJF
请注意,它并不完美。您有一个每秒执行一次的队列。如果队列为空并添加了新项目,则新项目在执行之前可能会等待将近一秒钟。
但这应该让你非常接近你所需要的:)
此代码可分为 3 部分:
- 速率受限的队列执行器,我称之为服务器(我很不擅长命名)——服务器对函数一无所知。它所做的就是启动一个 never-ending goroutine,每秒弹出队列中最旧的函数,然后调用它。我上面说的问题在代码的这一部分顺便说一句,如果你愿意,我可以帮你解决它。
- 按钮点击功能 - 这向您展示了每次按钮点击如何使用服务器调用 3 个差异函数(您显然可以进行 more/less 函数调用)并确保它们彼此相隔 1 秒其他。您甚至可以为任何函数添加超时(以伪造延迟),它们仍然会间隔 1 秒被调用。这是您唯一需要通道的地方,因为您希望尽可能快地进行所有函数调用(如果第一个函数需要 5 秒,您只想等待 1 秒来调用第二个函数)然后等待它们完成,所以您需要知道它们何时完成。
按钮点击模拟(主要功能)- 这仅表明 3 次按钮点击会按预期工作。你也可以将它们放在一个 goroutine 中来模拟 3 个用户同时单击按钮,它仍然有效。
package main
import (
"fmt"
"sync"
"time"
)
const (
requestFreq = time.Second
)
type (
// A single request
request func()
// The server that will hold a queue of requests and make them once a requestFreq
server struct {
// This will tick once per requestFreq
ticker *time.Ticker
requests []request
// Mutex for working with the request slice
sync.RWMutex
}
)
var (
createServerOnce sync.Once
s *server
)
func main() {
// Multiple button clicks:
ButtonClick()
ButtonClick()
ButtonClick()
fmt.Println("Done!")
}
// BUTTON LOGIC:
// Calls 3 functions and returns 3 diff values.
// Each function is called at least 1 second appart.
func ButtonClick() (val1 int, val2 string, val3 bool) {
iCh := make(chan int)
sCh := make(chan string)
bCh := make(chan bool)
go func(){
Server().AppendRequest(func() {
t := time.Now()
fmt.Println("Calling func1 (time: " + t.Format("15:04:05") + ")")
// do some stuff
iCh <- 1
})
}()
go func(){
Server().AppendRequest(func() {
t := time.Now()
fmt.Println("Calling func2 (time: " + t.Format("15:04:05") + ")")
// do some stuff
sCh <- "Yo"
})
}()
go func(){
Server().AppendRequest(func() {
t := time.Now()
fmt.Println("Calling func3 (time: " + t.Format("15:04:05") + ")")
// do some stuff
bCh <- true
})
}()
// Wait for all 3 calls to come back
for count := 0; count < 3; count++ {
select {
case val1 = <-iCh:
case val2 = <-sCh:
case val3 = <-bCh:
}
}
return
}
// SERVER LOGIC
// Factory function that will only create a single server
func Server() *server {
// Only one server for the entire application
createServerOnce.Do(func() {
s = &server{ticker: time.NewTicker(requestFreq), requests: []request{}}
// Start a thread to make requests.
go s.makeRequests()
})
return s
}
func (s *server) makeRequests() {
if s == nil || s.ticker == nil {
return
}
// This will keep going once per each requestFreq
for _ = range s.ticker.C {
var r request
// You can't just access s.requests because you are in a goroutine
// here while someone could be adding new requests outside of the
// goroutine so you have to use locks.
s.Lock()
if len(s.requests) > 0 {
// We have a lock here, which blocks all other operations
// so just shift the first request out, save it and give
// the lock back before doing any work.
r = s.requests[0]
s.requests = s.requests[1:]
}
s.Unlock()
if r != nil {
// make the request!
r()
}
}
}
func (s *server) AppendRequest(r request) {
if s == nil {
return
}
s.Lock()
s.requests = append(s.requests, r)
s.Unlock()
}
我正在尝试通过将调用的函数放入队列中以便稍后访问来限制它们的速率。下面是我创建的一部分请求,requestHandler 函数以特定速率处理每个请求。
我希望它接受具有不同类型参数的各种函数,因此是 interface{} 类型。
如何通过通道传递函数并成功调用它们?
type request struct {
function interface{}
channel chan interface{}
}
var requestQueue []request
func pushQueue(f interface{}, ch chan interface{}) {
req := request{
f,
ch,
}
//push
requestQueue = append(requestQueue, req)
}
func requestHandler() {
for {
if len(requestQueue) > 0 {
//pop
req := requestQueue[len(requestQueue)-1]
requestQueue = requestQueue[:len(requestQueue)-1]
req.channel <- req.function
}
<-time.After(1200 * time.Millisecond)
}
}
这是我正在尝试实现的示例(GetLeagueEntries(string, string) 和 GetSummonerName(int, int) 是函数):
ch := make(chan interface{})
pushQueue(l.GetLeagueEntries, ch)
pushQueue(l.GetSummonerName, ch)
leagues, _ := <-ch(string1, string2)
summoners, _ := <-ch(int1, int2)
我原以为使用某种信号量或工作池会更容易。这样一来,您可以做任何事情的工人数量有限。也可以有多个工作池。
您需要这些电话中的任何一个才能成为 concurrent/asynchronous 吗?如果没有,可以调用它们以便您可以配置睡眠(一个讨厌的 hack 头脑)。
尝试使用工作线程池或信号量而不是一系列函数。
首先,我会写成:
leagues := server.GetLeagueEntries()
summoners := server.GetSummoners()
并且,将速率限制放入服务器。使用 rate-limiting 个库之一。
但是可以用一个接口统一请求,用一个func类型允许闭包(如http.HandleFunc):
type Command interface {
Execute(server *Server)
}
type CommandFunc func(server *Server)
func (fn CommandFunc) Execute(server *Server) { fn(server) }
type GetLeagueEntries struct { Leagues []League }
func (entries *GetLeagueEntries) Execute(server *Server) {
// ...
}
func GetSummonerName(id int, result *string) CommandFunc {
return CommandFunc(func(server *Server){
*result = "hello"
})
}
get := GetLeagueEnties{}
requests <- &get
requests <- CommandFunc(func(server *Server){
// ... handle struff here
})
当然,这需要一些同步。
好的,这是代码:https://play.golang.org/p/XZvb_4BaJF
请注意,它并不完美。您有一个每秒执行一次的队列。如果队列为空并添加了新项目,则新项目在执行之前可能会等待将近一秒钟。
但这应该让你非常接近你所需要的:)
此代码可分为 3 部分:
- 速率受限的队列执行器,我称之为服务器(我很不擅长命名)——服务器对函数一无所知。它所做的就是启动一个 never-ending goroutine,每秒弹出队列中最旧的函数,然后调用它。我上面说的问题在代码的这一部分顺便说一句,如果你愿意,我可以帮你解决它。
- 按钮点击功能 - 这向您展示了每次按钮点击如何使用服务器调用 3 个差异函数(您显然可以进行 more/less 函数调用)并确保它们彼此相隔 1 秒其他。您甚至可以为任何函数添加超时(以伪造延迟),它们仍然会间隔 1 秒被调用。这是您唯一需要通道的地方,因为您希望尽可能快地进行所有函数调用(如果第一个函数需要 5 秒,您只想等待 1 秒来调用第二个函数)然后等待它们完成,所以您需要知道它们何时完成。
按钮点击模拟(主要功能)- 这仅表明 3 次按钮点击会按预期工作。你也可以将它们放在一个 goroutine 中来模拟 3 个用户同时单击按钮,它仍然有效。
package main import ( "fmt" "sync" "time" ) const ( requestFreq = time.Second ) type ( // A single request request func() // The server that will hold a queue of requests and make them once a requestFreq server struct { // This will tick once per requestFreq ticker *time.Ticker requests []request // Mutex for working with the request slice sync.RWMutex } ) var ( createServerOnce sync.Once s *server ) func main() { // Multiple button clicks: ButtonClick() ButtonClick() ButtonClick() fmt.Println("Done!") } // BUTTON LOGIC: // Calls 3 functions and returns 3 diff values. // Each function is called at least 1 second appart. func ButtonClick() (val1 int, val2 string, val3 bool) { iCh := make(chan int) sCh := make(chan string) bCh := make(chan bool) go func(){ Server().AppendRequest(func() { t := time.Now() fmt.Println("Calling func1 (time: " + t.Format("15:04:05") + ")") // do some stuff iCh <- 1 }) }() go func(){ Server().AppendRequest(func() { t := time.Now() fmt.Println("Calling func2 (time: " + t.Format("15:04:05") + ")") // do some stuff sCh <- "Yo" }) }() go func(){ Server().AppendRequest(func() { t := time.Now() fmt.Println("Calling func3 (time: " + t.Format("15:04:05") + ")") // do some stuff bCh <- true }) }() // Wait for all 3 calls to come back for count := 0; count < 3; count++ { select { case val1 = <-iCh: case val2 = <-sCh: case val3 = <-bCh: } } return } // SERVER LOGIC // Factory function that will only create a single server func Server() *server { // Only one server for the entire application createServerOnce.Do(func() { s = &server{ticker: time.NewTicker(requestFreq), requests: []request{}} // Start a thread to make requests. go s.makeRequests() }) return s } func (s *server) makeRequests() { if s == nil || s.ticker == nil { return } // This will keep going once per each requestFreq for _ = range s.ticker.C { var r request // You can't just access s.requests because you are in a goroutine // here while someone could be adding new requests outside of the // goroutine so you have to use locks. s.Lock() if len(s.requests) > 0 { // We have a lock here, which blocks all other operations // so just shift the first request out, save it and give // the lock back before doing any work. r = s.requests[0] s.requests = s.requests[1:] } s.Unlock() if r != nil { // make the request! r() } } } func (s *server) AppendRequest(r request) { if s == nil { return } s.Lock() s.requests = append(s.requests, r) s.Unlock() }