如何在执行一组工人之间适当地延迟
How to properly delay between executing a pool of workers
美好的一天,
我正在尝试在工作人员执行之间实现正确的延迟,例如,工作人员必须完成 30 个任务并休眠 5 秒,我如何在代码中准确跟踪30 个任务 已完成,仅在完成后才进入休眠 5 秒?
下面是创建 30 名工人 池的代码,这些工人依次以无序方式一次执行 30 件任务,代码如下:
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
if i != 0 && i%30 == 0 {
fmt.Printf("SLEEPAGE 5 sec...")
time.Sleep(10 * time.Second)
}
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 30
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
播放:https://go.dev/play/p/lehl7hoo-kp
具体如何确保30个任务完成以及在何处插入延迟尚不清楚,如有帮助,将不胜感激
好的,让我们从这个工作示例开始:
func Test_t(t *testing.T) {
// just a published, this publishes result on a chan
publish := func(s int, ch chan int, wg *sync.WaitGroup) {
ch <- s // this is blocking!!!
wg.Done()
}
wg := &sync.WaitGroup{}
wg.Add(100)
// we'll use done channel to notify the work is done
res := make(chan int)
done := make(chan struct{})
// create worker that will notify that all results were published
go func() {
wg.Wait()
done <- struct{}{}
}()
// let's create a jobs that publish on our res chan
// please note all goroutines are created immediately
for i := 0; i < 100; i++ {
go publish(i, res, wg)
}
// lets get 30 args and then wait
var resCounter int
forloop:
for {
select {
case ss := <-res:
println(ss)
resCounter += 1
// break the loop
if resCounter%30 == 0 {
// after receiving 30 results we are blocking this thread
// no more results will be taken from the channel for 5 seconds
println("received 30 results, waiting...")
time.Sleep(5 * time.Second)
}
case <-done:
// we are done here, let's break this infinite loop
break forloop
}
}
}
我希望这能说明如何做到这一点。
那么,您的代码有什么问题?
老实说,看起来不错(我的意思是发布了 30 个结果,然后是代码等待,然后是另外 30 个结果,等等),但问题是 您想在哪里等待?
我猜有几种可能:
创建工人(这就是您的代码现在的工作方式,如我所见,它以 30 包的形式发布工作;请注意 digit
中的 2 秒延迟函数仅适用于执行代码的goroutine)
触发工人(所以“等待”代码应该在工人函数中,不允许 运行 更多工人 - 所以它必须观察有多少结果被发布)
处理结果(这就是我的代码的工作方式,forloop
中的正确同步)
美好的一天,
我正在尝试在工作人员执行之间实现正确的延迟,例如,工作人员必须完成 30 个任务并休眠 5 秒,我如何在代码中准确跟踪30 个任务 已完成,仅在完成后才进入休眠 5 秒?
下面是创建 30 名工人 池的代码,这些工人依次以无序方式一次执行 30 件任务,代码如下:
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
if i != 0 && i%30 == 0 {
fmt.Printf("SLEEPAGE 5 sec...")
time.Sleep(10 * time.Second)
}
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 30
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
播放:https://go.dev/play/p/lehl7hoo-kp
具体如何确保30个任务完成以及在何处插入延迟尚不清楚,如有帮助,将不胜感激
好的,让我们从这个工作示例开始:
func Test_t(t *testing.T) {
// just a published, this publishes result on a chan
publish := func(s int, ch chan int, wg *sync.WaitGroup) {
ch <- s // this is blocking!!!
wg.Done()
}
wg := &sync.WaitGroup{}
wg.Add(100)
// we'll use done channel to notify the work is done
res := make(chan int)
done := make(chan struct{})
// create worker that will notify that all results were published
go func() {
wg.Wait()
done <- struct{}{}
}()
// let's create a jobs that publish on our res chan
// please note all goroutines are created immediately
for i := 0; i < 100; i++ {
go publish(i, res, wg)
}
// lets get 30 args and then wait
var resCounter int
forloop:
for {
select {
case ss := <-res:
println(ss)
resCounter += 1
// break the loop
if resCounter%30 == 0 {
// after receiving 30 results we are blocking this thread
// no more results will be taken from the channel for 5 seconds
println("received 30 results, waiting...")
time.Sleep(5 * time.Second)
}
case <-done:
// we are done here, let's break this infinite loop
break forloop
}
}
}
我希望这能说明如何做到这一点。
那么,您的代码有什么问题? 老实说,看起来不错(我的意思是发布了 30 个结果,然后是代码等待,然后是另外 30 个结果,等等),但问题是 您想在哪里等待?
我猜有几种可能:
创建工人(这就是您的代码现在的工作方式,如我所见,它以 30 包的形式发布工作;请注意
digit
中的 2 秒延迟函数仅适用于执行代码的goroutine)触发工人(所以“等待”代码应该在工人函数中,不允许 运行 更多工人 - 所以它必须观察有多少结果被发布)
处理结果(这就是我的代码的工作方式,
forloop
中的正确同步)