具有缓冲作业和固定轮询间隔的工作池
Worker pool with buffered jobs and fixed polling interval
我有一个工作人员池在工作频道上侦听,并在结果频道上响应。
作业制作者必须 运行 在固定的代码间隔内。在读取刚好足够的新作业以填满缓冲区之前,必须刷新结果。 批量刷新结果和读取新作业至关重要。
请参阅下面的示例代码,运行 它位于 playground here。
是否可以在没有跟踪 飞行中 作业的原子计数器的情况下重写它?
// Worker pool with buffered jobs and fixed polling interval
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
// buf is the size of the jobs buffer
buf := 5
// workers is the number of workers to start
workers := 3
// jobs chan for workers
jobs := make(chan int, buf)
// results chan for workers
results := make(chan int, buf*2)
// jobID is incremented for each job sent on the jobs chan
var jobID int
// inflight is a count of the items in the jobs chan buffer
var inflight uint64
// pollInterval for jobs producer
pollInterval := 500 * time.Millisecond
// pollDone chan to stop polling
pollDone := make(chan bool)
// jobMultiplier on pollInterval for random job processing times
jobMultiplier := 5
// done chan to exit program
done := make(chan bool)
// Start workers
wg := sync.WaitGroup{}
for n := 0; n < workers; n++ {
wg.Add(1)
go (func(n int) {
defer wg.Done()
for {
// Receive from channel or block
jobID, more := <-jobs
if more {
// To subtract a signed positive constant value...
// https://golang.org/pkg/sync/atomic/#AddUint64
c := atomic.AddUint64(&inflight, ^uint64(0))
fmt.Println(
fmt.Sprintf("worker %v processing %v - %v jobs left",
n, jobID, c))
// Processing the job...
m := rand.Intn(jobMultiplier)
time.Sleep(time.Duration(m) * pollInterval)
results <- jobID
} else {
fmt.Println(fmt.Sprintf("worker %v exited", n))
return
}
}
})(n)
}
// Signal to exit
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("ctrl+c to exit")
go (func() {
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Println(
fmt.Sprintf("===> results: %v", strings.Join(r, ",")))
r = make([]string, 0)
}
for {
select {
case <-ticker.C:
flushResults()
// Fetch jobs
c := atomic.LoadUint64(&inflight)
d := uint64(buf) - c
for i := 0; i < int(d); i++ {
jobID++
jobs <- jobID
atomic.AddUint64(&inflight, 1)
}
fmt.Println(fmt.Sprintf("===> send %v jobs", d))
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-pollDone:
// Stop polling for new jobs
ticker.Stop()
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
close(results)
// Flush remaining results
for {
jobID, more := <-results
if more {
r = append(r, fmt.Sprintf("%v", jobID))
} else {
break
}
}
flushResults()
// Done!
done <- true
return
}
}
})()
// Wait for exit signal
<-sig
fmt.Println("---------| EXIT |---------")
pollDone <- true
<-done
fmt.Println("...done")
}
这是您的代码的基于通道的版本,在功能上等同于上述示例的意图。关键点是我们没有使用任何原子值来改变代码的逻辑,因为这不提供 goroutine 之间的同步。 goroutines 之间的所有交互都使用通道同步,sync.WaitGroup
,或 context.Context
。可能有更好的方法来解决手头的问题,但这表明不需要原子来协调队列和工作人员。
这里 goroutine 之间唯一未协调的值是在日志输出中使用 len(jobs)
。使用它是否有意义取决于你,因为它的值在并发世界中没有意义,但它是 安全 因为它是同步的并发使用并且没有基于的逻辑值。
buf := 5
workers := 3
jobs := make(chan int, buf)
// results buffer must always be larger than workers + buf to prevent deadlock
results := make(chan int, buf*2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start workers
var wg sync.WaitGroup
for n := 0; n < workers; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for jobID := range jobs {
fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))
time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)
results <- jobID
}
fmt.Printf("worker %v exited", n)
}(n)
}
var done sync.WaitGroup
done.Add(1)
go func() {
defer done.Done()
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Printf("===> results: %v\n", strings.Join(r, ","))
r = r[:0]
}
for {
select {
case <-ticker.C:
flushResults()
// send max buf jobs, or fill the queue
for i := 0; i < buf; i++ {
jobID++
select {
case jobs <- jobID:
continue
}
break
}
fmt.Printf("===> send %v jobs\n", i)
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-ctx.Done():
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
// we can close results for easy iteration because we know
// there are no more workers.
close(results)
// Flush remaining results
for jobID := range results {
r = append(r, fmt.Sprintf("%v", jobID))
}
flushResults()
return
}
}
}()
我有一个工作人员池在工作频道上侦听,并在结果频道上响应。
作业制作者必须 运行 在固定的代码间隔内。在读取刚好足够的新作业以填满缓冲区之前,必须刷新结果。 批量刷新结果和读取新作业至关重要。
请参阅下面的示例代码,运行 它位于 playground here。
是否可以在没有跟踪 飞行中 作业的原子计数器的情况下重写它?
// Worker pool with buffered jobs and fixed polling interval
package main
import (
"fmt"
"math/rand"
"os"
"os/signal"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
// buf is the size of the jobs buffer
buf := 5
// workers is the number of workers to start
workers := 3
// jobs chan for workers
jobs := make(chan int, buf)
// results chan for workers
results := make(chan int, buf*2)
// jobID is incremented for each job sent on the jobs chan
var jobID int
// inflight is a count of the items in the jobs chan buffer
var inflight uint64
// pollInterval for jobs producer
pollInterval := 500 * time.Millisecond
// pollDone chan to stop polling
pollDone := make(chan bool)
// jobMultiplier on pollInterval for random job processing times
jobMultiplier := 5
// done chan to exit program
done := make(chan bool)
// Start workers
wg := sync.WaitGroup{}
for n := 0; n < workers; n++ {
wg.Add(1)
go (func(n int) {
defer wg.Done()
for {
// Receive from channel or block
jobID, more := <-jobs
if more {
// To subtract a signed positive constant value...
// https://golang.org/pkg/sync/atomic/#AddUint64
c := atomic.AddUint64(&inflight, ^uint64(0))
fmt.Println(
fmt.Sprintf("worker %v processing %v - %v jobs left",
n, jobID, c))
// Processing the job...
m := rand.Intn(jobMultiplier)
time.Sleep(time.Duration(m) * pollInterval)
results <- jobID
} else {
fmt.Println(fmt.Sprintf("worker %v exited", n))
return
}
}
})(n)
}
// Signal to exit
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
fmt.Println("ctrl+c to exit")
go (func() {
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Println(
fmt.Sprintf("===> results: %v", strings.Join(r, ",")))
r = make([]string, 0)
}
for {
select {
case <-ticker.C:
flushResults()
// Fetch jobs
c := atomic.LoadUint64(&inflight)
d := uint64(buf) - c
for i := 0; i < int(d); i++ {
jobID++
jobs <- jobID
atomic.AddUint64(&inflight, 1)
}
fmt.Println(fmt.Sprintf("===> send %v jobs", d))
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-pollDone:
// Stop polling for new jobs
ticker.Stop()
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
close(results)
// Flush remaining results
for {
jobID, more := <-results
if more {
r = append(r, fmt.Sprintf("%v", jobID))
} else {
break
}
}
flushResults()
// Done!
done <- true
return
}
}
})()
// Wait for exit signal
<-sig
fmt.Println("---------| EXIT |---------")
pollDone <- true
<-done
fmt.Println("...done")
}
这是您的代码的基于通道的版本,在功能上等同于上述示例的意图。关键点是我们没有使用任何原子值来改变代码的逻辑,因为这不提供 goroutine 之间的同步。 goroutines 之间的所有交互都使用通道同步,sync.WaitGroup
,或 context.Context
。可能有更好的方法来解决手头的问题,但这表明不需要原子来协调队列和工作人员。
这里 goroutine 之间唯一未协调的值是在日志输出中使用 len(jobs)
。使用它是否有意义取决于你,因为它的值在并发世界中没有意义,但它是 安全 因为它是同步的并发使用并且没有基于的逻辑值。
buf := 5
workers := 3
jobs := make(chan int, buf)
// results buffer must always be larger than workers + buf to prevent deadlock
results := make(chan int, buf*2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start workers
var wg sync.WaitGroup
for n := 0; n < workers; n++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
for jobID := range jobs {
fmt.Printf("worker %v processing %v - %v jobs left\n", n, jobID, len(jobs))
time.Sleep(time.Duration(rand.Intn(5)) * pollInterval)
results <- jobID
}
fmt.Printf("worker %v exited", n)
}(n)
}
var done sync.WaitGroup
done.Add(1)
go func() {
defer done.Done()
ticker := time.NewTicker(pollInterval)
r := make([]string, 0)
flushResults := func() {
fmt.Printf("===> results: %v\n", strings.Join(r, ","))
r = r[:0]
}
for {
select {
case <-ticker.C:
flushResults()
// send max buf jobs, or fill the queue
for i := 0; i < buf; i++ {
jobID++
select {
case jobs <- jobID:
continue
}
break
}
fmt.Printf("===> send %v jobs\n", i)
case jobID := <-results:
r = append(r, fmt.Sprintf("%v", jobID))
case <-ctx.Done():
// Close jobs channel to stop workers
close(jobs)
// Wait for workers to exit
wg.Wait()
// we can close results for easy iteration because we know
// there are no more workers.
close(results)
// Flush remaining results
for jobID := range results {
r = append(r, fmt.Sprintf("%v", jobID))
}
flushResults()
return
}
}
}()