Go并发循环逻辑
Go Concurrency Circular Logic
我刚刚开始使用 Go 并发并尝试创建一个 dispatch go 例程,它将作业发送到在 jobchan 通道上侦听的工作池。如果一条消息通过 dispatchchan 通道进入我的调度函数,而我的其他 go 例程正忙,则该消息将附加到调度程序中的堆栈切片上,调度程序将在稍后有工作人员可用时尝试再次发送,and/or dispatchchan 上没有收到更多消息。这是因为 dispatchchan 和 jobchan 是无缓冲的,并且工人 运行 的 go 例程会将其他消息附加到调度程序直到某个点,我不希望工人阻塞等待调度程序并创建一个僵局。这是我到目前为止提出的调度程序代码:
func dispatch() {
var stack []string
acount := 0
for {
select {
case d := <-dispatchchan:
stack = append(stack, d)
case c := <-mw:
acount = acount + c
case jobchan <-stack[0]:
if len(stack) > 1 {
stack[0] = stack[len(stack)-1]
stack = stack[:len(stack)-1]
} else {
stack = nil
}
default:
if acount == 0 && len(stack) == 0 {
close(jobchan)
close(dispatchchan)
close(mw)
wg.Done()
return
}
}
}
完整示例位于 https://play.golang.wiki/p/X6kXVNUn5N7
mw 通道是一个缓冲通道,其长度与 worker go routines 的数量相同。它充当工作池的信号量。如果 worker 例程正在执行 [m] 有意义的 [w] ork,它会在 mw 通道上抛出 int 1,当它完成其工作并返回到 for 循环监听 jobchan 时,它会在 mw 上抛出 int -1。这样调度员就知道工作池是否正在做任何工作,或者池是否空闲。如果池空闲并且堆栈上没有更多消息,则调度程序关闭通道并return控制主函数。
这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,因此在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,我会遇到错误边界错误。我想弄清楚的是如何确保当我遇到这种情况时,stack[0] 是否有值。我不希望那种情况向 jobchan 发送一个空字符串。
非常感谢任何帮助。如果有我应该考虑的更惯用的并发模式,我很想听听。我并没有 100% 相信这个解决方案,但这是迄今为止我得到的最远的。
This is all good but the issue I have is that the stack itself could be zero length so the case where I attempt to send stack[0] to the jobchan, if the stack is empty, I get an out of bounds error.
我无法用你的 playground link 重现它,但它是可信的,因为至少有一个 gofunc
工作人员可能已经准备好在那个频道上接收。
我的输出是 Msgcnt: 0
,这也很容易解释,因为 gofunc
可能 而不是 已经准备好在 jobschan
上接收当 dispatch()
运行 是 select
。这些操作的顺序未定义。
trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel
频道不需要调度程序。频道是调度员。
If a message comes into my dispatch function via the dispatchchan channel and my other go routines are busy, the message is [...] will [...] send again later when a worker becomes available, [...] or no more messages are received on the dispatchchan.
通过一些创造性的编辑,很容易将其变成接近缓冲通道定义的东西。它可以立即读取,或它最多可以占用一些“limit
”无法立即发送的消息。您确实定义了 limit
,尽管它没有在您的代码中的其他地方使用。
在任何函数中,定义一个你不读取的变量都会导致像 limit declared but not used
这样的编译时错误。这种限制提高了代码质量并有助于识别 typeos。但是在包范围内,您已经将未使用的 limit
定义为“全局”,从而避免了一个有用的错误——您没有限制任何东西。
不要使用全局变量。使用传递的参数来定义作用域,因为作用域的定义等同于go
关键字所表达的函数式并发。 将在local作用域中定义的相关通道传递给在包作用域中定义的函数,以便您可以轻松跟踪它们的关系。并使用 directional channels 强制执行函数之间的 producer/consumer 关系。稍后会详细介绍。
回到“限制”,限制您排队的作业数量是有意义的,因为所有资源都是有限的,并且接受比您预期的处理更多的消息需要比进程内存提供的更持久的存储.如果您觉得没有义务满足这些要求无论如何,首先不要接受“太多”的要求。
那么,dispatchchan
和dispatch()
有什么功能呢?在处理之前存储有限数量的待处理请求(如果有的话),然后将它们发送给下一个可用的工作人员? 正是缓冲通道的作用。
Circular Logic
谁“知道”您的程序何时完成? main()
提供初始输入,但您在 `dispatch():
中关闭了所有 3 个通道
close(jobchan)
close(dispatchchan)
close(mw)
您的工作人员写入他们自己的作业队列,因此只有当工作人员完成写入后才能关闭传入的作业队列。然而,个别工作人员也不知道何时关闭作业队列,因为其他工作人员正在写入。 没有人 知道您的算法何时完成。这是你的循环逻辑。
The mw channel is a buffered channel the same length as the number of worker go routines. It acts as a semaphore for the worker pool.
此处存在竞争条件。考虑所有 n
个工人刚刚收到最后 n
个工作的情况。他们每个人都从 jobschan
中读取并且正在检查 ok
的值。 disptatcher
继续 运行 其 select
。现在没有人写入 dispatchchan
或从 jobschan
读取,所以 default
的情况会立即匹配。 len(stack)
是 0
并且没有当前的 job
,因此 dispatcher
关闭了所有频道,包括 mw
。此后的某个时刻,一个工作人员试图写入一个关闭的通道并发生恐慌。
所以我终于准备好提供一些代码了,但我还有一个问题:我没有明确的问题陈述来编写代码。
I'm just getting into concurrency in Go and trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel.
goroutines 之间的通道就像同步齿轮的牙齿。但是齿轮转动的目的是什么?您不是要计时,也不是要制作发条玩具。你的齿轮可以转动,但成功是什么样子的呢?他们的转向?
让我们尝试为通道定义一个更具体的用例:给定一组任意长的持续时间作为标准输入上的字符串*,让 n
个工作人员中的一个休眠那么多秒。所以我们实际上有一个 结果 到 return,我们会说每个工人将 return 持续时间为 运行 的开始和结束时间.
- 为了它可以 运行 在操场上,我将使用硬编码字节缓冲区模拟标准输入。
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"strings"
"sync"
"time"
)
type SleepResult struct {
worker_id int
duration time.Duration
start time.Time
end time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&resultswg, resultschan)
for i := 0; i < num_workers; i++ {
wg.Add(1)
go worker(i, &wg, workchan, resultschan)
}
// playground doesn't have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
"3ms",
"1 seconds",
"3600ms",
"300 ms",
"5s",
"0.05min"}, "\n") + "\n")
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, "Invalid duration", text)
} else {
workchan <- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait() // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {
for res := range resultschan {
fmt.Printf("Worker %d: %s : %s => %s\n",
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan <- res
}
wg.Done()
}
这里我使用了 2 个等待组,一个用于 workers,一个用于结果。这确保我在 main()
结束之前完成所有结果的写入。我通过让每个函数一次只做一件事来保持我的函数简单:main 读取输入,从中解析持续时间,并将它们发送给下一个工作人员。 results
函数收集结果并将它们打印到标准输出。工人负责睡眠,从 jobchan
读取并写入 resultschan
.
workchan
可以缓冲(或不缓冲,如本例);没关系,因为输入将以可以处理的速率读取。我们可以根据需要缓冲尽可能多的输入,但我们不能缓冲无限量的输入。我已将通道大小设置为 1e6
- 但一百万远小于无限。对于我的用例,我根本不需要做任何缓冲。
main
知道输入完成后可以关闭 jobschan
。 main
也知道作业何时完成 (wg.Wait()
) 并且可以关闭结果通道。关闭这些通道是 worker
和 results
goroutines 的一个重要信号 - 它们可以区分空通道和保证不会有任何新添加的通道。
for job := range jobchan {...}
是 shorthand 你的更详细:
for {
job, ok := <- jobchan
if !ok {
wg.Done()
return
}
...
}
请注意,此代码创建了 2 个工人,但它可以创建 20 个或 2000 个,甚至 1 个。无论池中有多少工人,程序都会运行。它可以处理任何数量的输入(尽管没完没了的输入当然会导致没完没了的程序)。它不会创建输出到输入的循环。如果您的用例需要工作来创造更多工作,那么这是一个更具挑战性的场景,通常可以通过仔细规划来避免。
我希望这能给你一些关于如何在你的 Go 应用程序中更好地使用并发的好主意。
我刚刚开始使用 Go 并发并尝试创建一个 dispatch go 例程,它将作业发送到在 jobchan 通道上侦听的工作池。如果一条消息通过 dispatchchan 通道进入我的调度函数,而我的其他 go 例程正忙,则该消息将附加到调度程序中的堆栈切片上,调度程序将在稍后有工作人员可用时尝试再次发送,and/or dispatchchan 上没有收到更多消息。这是因为 dispatchchan 和 jobchan 是无缓冲的,并且工人 运行 的 go 例程会将其他消息附加到调度程序直到某个点,我不希望工人阻塞等待调度程序并创建一个僵局。这是我到目前为止提出的调度程序代码:
func dispatch() {
var stack []string
acount := 0
for {
select {
case d := <-dispatchchan:
stack = append(stack, d)
case c := <-mw:
acount = acount + c
case jobchan <-stack[0]:
if len(stack) > 1 {
stack[0] = stack[len(stack)-1]
stack = stack[:len(stack)-1]
} else {
stack = nil
}
default:
if acount == 0 && len(stack) == 0 {
close(jobchan)
close(dispatchchan)
close(mw)
wg.Done()
return
}
}
}
完整示例位于 https://play.golang.wiki/p/X6kXVNUn5N7
mw 通道是一个缓冲通道,其长度与 worker go routines 的数量相同。它充当工作池的信号量。如果 worker 例程正在执行 [m] 有意义的 [w] ork,它会在 mw 通道上抛出 int 1,当它完成其工作并返回到 for 循环监听 jobchan 时,它会在 mw 上抛出 int -1。这样调度员就知道工作池是否正在做任何工作,或者池是否空闲。如果池空闲并且堆栈上没有更多消息,则调度程序关闭通道并return控制主函数。
这一切都很好,但我遇到的问题是堆栈本身的长度可能为零,因此在我尝试将堆栈 [0] 发送到 jobchan 的情况下,如果堆栈为空,我会遇到错误边界错误。我想弄清楚的是如何确保当我遇到这种情况时,stack[0] 是否有值。我不希望那种情况向 jobchan 发送一个空字符串。
非常感谢任何帮助。如果有我应该考虑的更惯用的并发模式,我很想听听。我并没有 100% 相信这个解决方案,但这是迄今为止我得到的最远的。
This is all good but the issue I have is that the stack itself could be zero length so the case where I attempt to send stack[0] to the jobchan, if the stack is empty, I get an out of bounds error.
我无法用你的 playground link 重现它,但它是可信的,因为至少有一个 gofunc
工作人员可能已经准备好在那个频道上接收。
我的输出是 Msgcnt: 0
,这也很容易解释,因为 gofunc
可能 而不是 已经准备好在 jobschan
上接收当 dispatch()
运行 是 select
。这些操作的顺序未定义。
trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel
频道不需要调度程序。频道是调度员。
If a message comes into my dispatch function via the dispatchchan channel and my other go routines are busy, the message is [...] will [...] send again later when a worker becomes available, [...] or no more messages are received on the dispatchchan.
通过一些创造性的编辑,很容易将其变成接近缓冲通道定义的东西。它可以立即读取,或它最多可以占用一些“limit
”无法立即发送的消息。您确实定义了 limit
,尽管它没有在您的代码中的其他地方使用。
在任何函数中,定义一个你不读取的变量都会导致像 limit declared but not used
这样的编译时错误。这种限制提高了代码质量并有助于识别 typeos。但是在包范围内,您已经将未使用的 limit
定义为“全局”,从而避免了一个有用的错误——您没有限制任何东西。
不要使用全局变量。使用传递的参数来定义作用域,因为作用域的定义等同于go
关键字所表达的函数式并发。 将在local作用域中定义的相关通道传递给在包作用域中定义的函数,以便您可以轻松跟踪它们的关系。并使用 directional channels 强制执行函数之间的 producer/consumer 关系。稍后会详细介绍。
回到“限制”,限制您排队的作业数量是有意义的,因为所有资源都是有限的,并且接受比您预期的处理更多的消息需要比进程内存提供的更持久的存储.如果您觉得没有义务满足这些要求无论如何,首先不要接受“太多”的要求。
那么,dispatchchan
和dispatch()
有什么功能呢?在处理之前存储有限数量的待处理请求(如果有的话),然后将它们发送给下一个可用的工作人员? 正是缓冲通道的作用。
Circular Logic
谁“知道”您的程序何时完成? main()
提供初始输入,但您在 `dispatch():
close(jobchan)
close(dispatchchan)
close(mw)
您的工作人员写入他们自己的作业队列,因此只有当工作人员完成写入后才能关闭传入的作业队列。然而,个别工作人员也不知道何时关闭作业队列,因为其他工作人员正在写入。 没有人 知道您的算法何时完成。这是你的循环逻辑。
The mw channel is a buffered channel the same length as the number of worker go routines. It acts as a semaphore for the worker pool.
此处存在竞争条件。考虑所有 n
个工人刚刚收到最后 n
个工作的情况。他们每个人都从 jobschan
中读取并且正在检查 ok
的值。 disptatcher
继续 运行 其 select
。现在没有人写入 dispatchchan
或从 jobschan
读取,所以 default
的情况会立即匹配。 len(stack)
是 0
并且没有当前的 job
,因此 dispatcher
关闭了所有频道,包括 mw
。此后的某个时刻,一个工作人员试图写入一个关闭的通道并发生恐慌。
所以我终于准备好提供一些代码了,但我还有一个问题:我没有明确的问题陈述来编写代码。
I'm just getting into concurrency in Go and trying to create a dispatch go routine that will send jobs to a worker pool listening on the jobchan channel.
goroutines 之间的通道就像同步齿轮的牙齿。但是齿轮转动的目的是什么?您不是要计时,也不是要制作发条玩具。你的齿轮可以转动,但成功是什么样子的呢?他们的转向?
让我们尝试为通道定义一个更具体的用例:给定一组任意长的持续时间作为标准输入上的字符串*,让 n
个工作人员中的一个休眠那么多秒。所以我们实际上有一个 结果 到 return,我们会说每个工人将 return 持续时间为 运行 的开始和结束时间.
- 为了它可以 运行 在操场上,我将使用硬编码字节缓冲区模拟标准输入。
package main
import (
"bufio"
"bytes"
"fmt"
"os"
"strings"
"sync"
"time"
)
type SleepResult struct {
worker_id int
duration time.Duration
start time.Time
end time.Time
}
func main() {
var num_workers = 2
workchan := make(chan time.Duration)
resultschan := make(chan SleepResult)
var wg sync.WaitGroup
var resultswg sync.WaitGroup
resultswg.Add(1)
go results(&resultswg, resultschan)
for i := 0; i < num_workers; i++ {
wg.Add(1)
go worker(i, &wg, workchan, resultschan)
}
// playground doesn't have stdin
var input = bytes.NewBufferString(
strings.Join([]string{
"3ms",
"1 seconds",
"3600ms",
"300 ms",
"5s",
"0.05min"}, "\n") + "\n")
var scanner = bufio.NewScanner(input)
for scanner.Scan() {
text := scanner.Text()
if dur, err := time.ParseDuration(text); err != nil {
fmt.Fprintln(os.Stderr, "Invalid duration", text)
} else {
workchan <- dur
}
}
close(workchan) // we know when our inputs are done
wg.Wait() // and when our jobs are done
close(resultschan)
resultswg.Wait()
}
func results(wg *sync.WaitGroup, resultschan <-chan SleepResult) {
for res := range resultschan {
fmt.Printf("Worker %d: %s : %s => %s\n",
res.worker_id, res.duration,
res.start.Format(time.RFC3339Nano), res.end.Format(time.RFC3339Nano))
}
wg.Done()
}
func worker(id int, wg *sync.WaitGroup, jobchan <-chan time.Duration, resultschan chan<- SleepResult) {
var res = SleepResult{worker_id: id}
for dur := range jobchan {
res.duration = dur
res.start = time.Now()
time.Sleep(res.duration)
res.end = time.Now()
resultschan <- res
}
wg.Done()
}
这里我使用了 2 个等待组,一个用于 workers,一个用于结果。这确保我在 main()
结束之前完成所有结果的写入。我通过让每个函数一次只做一件事来保持我的函数简单:main 读取输入,从中解析持续时间,并将它们发送给下一个工作人员。 results
函数收集结果并将它们打印到标准输出。工人负责睡眠,从 jobchan
读取并写入 resultschan
.
workchan
可以缓冲(或不缓冲,如本例);没关系,因为输入将以可以处理的速率读取。我们可以根据需要缓冲尽可能多的输入,但我们不能缓冲无限量的输入。我已将通道大小设置为 1e6
- 但一百万远小于无限。对于我的用例,我根本不需要做任何缓冲。
main
知道输入完成后可以关闭 jobschan
。 main
也知道作业何时完成 (wg.Wait()
) 并且可以关闭结果通道。关闭这些通道是 worker
和 results
goroutines 的一个重要信号 - 它们可以区分空通道和保证不会有任何新添加的通道。
for job := range jobchan {...}
是 shorthand 你的更详细:
for {
job, ok := <- jobchan
if !ok {
wg.Done()
return
}
...
}
请注意,此代码创建了 2 个工人,但它可以创建 20 个或 2000 个,甚至 1 个。无论池中有多少工人,程序都会运行。它可以处理任何数量的输入(尽管没完没了的输入当然会导致没完没了的程序)。它不会创建输出到输入的循环。如果您的用例需要工作来创造更多工作,那么这是一个更具挑战性的场景,通常可以通过仔细规划来避免。
我希望这能给你一些关于如何在你的 Go 应用程序中更好地使用并发的好主意。