Golang 非阻塞缓冲区
Golang Non-Blocking Buffer
同步示例:
type job struct {
Id int
Message string
}
for {
// getJob() blocks until job is received
job := getJob()
doSomethingWithJob(job)
}
我希望处理来自 getJob
和 doSomethingWithJob
的作业。例如getJob 可以是从 MessagingQueue 接收的有效载荷,例如 RabbitMQ/Beanstalkd 或处理 HTTP 请求。
我不想在 doSomethingWithJob
时阻止 getJob
,反之亦然。但是,我确实想控制/缓冲作业数量,以免系统超载。例如最大并发数 5.
go routines 的概念目前让我感到困惑,因此非常感谢任何正确方向的指示来帮助我学习。
更新:感谢@JimB 的帮助。为什么工人 5 总是接手工作?
jobCh := make(chan *job)
// Max 5 Workers
for i := 0; i < 5; i++ {
go func() {
for job := range jobCh {
time.Sleep(time.Second * time.Duration(rand.Intn(3)))
log.Println(i, string(job.Message))
}
}()
}
for {
job, err := getJob()
if err != nil {
log.Println("Closing Channel")
close(jobCh)
break
}
jobCh <- job
}
log.Println("Complete")
示例输出
2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"}
您可以启动 5 个 goroutines 从一个通道读取数据以调用 doSomethingWithJob
。这样,并发处理的作业不会超过 5 个。
jobCh := make(chan *job)
// start 5 workers to process jobs
for i := 0; i < 5; i++ {
go func() {
for job := range jobCh {
doSomethingWithJob(job)
}
}()
}
// send jobs to workers as fast as we can
for {
jobCh <- getJob()
}
同步示例:
type job struct {
Id int
Message string
}
for {
// getJob() blocks until job is received
job := getJob()
doSomethingWithJob(job)
}
我希望处理来自 getJob
和 doSomethingWithJob
的作业。例如getJob 可以是从 MessagingQueue 接收的有效载荷,例如 RabbitMQ/Beanstalkd 或处理 HTTP 请求。
我不想在 doSomethingWithJob
时阻止 getJob
,反之亦然。但是,我确实想控制/缓冲作业数量,以免系统超载。例如最大并发数 5.
go routines 的概念目前让我感到困惑,因此非常感谢任何正确方向的指示来帮助我学习。
更新:感谢@JimB 的帮助。为什么工人 5 总是接手工作?
jobCh := make(chan *job)
// Max 5 Workers
for i := 0; i < 5; i++ {
go func() {
for job := range jobCh {
time.Sleep(time.Second * time.Duration(rand.Intn(3)))
log.Println(i, string(job.Message))
}
}()
}
for {
job, err := getJob()
if err != nil {
log.Println("Closing Channel")
close(jobCh)
break
}
jobCh <- job
}
log.Println("Complete")
示例输出
2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"}
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"}
您可以启动 5 个 goroutines 从一个通道读取数据以调用 doSomethingWithJob
。这样,并发处理的作业不会超过 5 个。
jobCh := make(chan *job)
// start 5 workers to process jobs
for i := 0; i < 5; i++ {
go func() {
for job := range jobCh {
doSomethingWithJob(job)
}
}()
}
// send jobs to workers as fast as we can
for {
jobCh <- getJob()
}