如何在 Go 中使用 go 并发以获得更好的运行时
How to use go concurrency for a better runtime in Go
我正在使用简单的问题示例尝试 Go 中的并发性,即第 n 个回文素数,例如:第 1 到第 9 个回文序列是 2、3、5、7、11、101、131、151。我被卡住了不知道该怎么办。
我现在的代码是这样的:
n := 9999999
count := 0
primePalindrome := 0
for i := 0; count < n; i++ {
go PrimeNumber(i)
go PalindromeNumber(i)
if PrimeNumber(i) && PalindromeNumber(i) {
primePalindrome = i
count++
}
}
我怎么知道函数 PrimeNumber(i) 和 PalindromeNumber(i) 已经由 go routine 执行和完成,以便我可以给出 IF CONDITION 来获取数字?
这里有多个问题需要解决:
- 我们想分拆“是素数”和“是回文”测试
- 我们想按顺序通过测试
的数字
- 当然,我们必须表达问题的“分拆”和“等待结果”部分在我们的编程语言(在本例中为 Go)中。
(除此之外,我们可能想优化我们的素数测试,可能使用 Sieve of Eratothenese 算法或类似算法,这也可能涉及并行性。)
这里的中间问题可能是最难的。不过,是一种相当明显的方法:我们可以观察到,如果我们为每个数字分配一个ascending-order数字tested , 返回的答案 (nis/isnot suitable), 即使以错误的顺序返回, 也很容易 re-shuffled 排序.
由于您的整个循环递增 1(这是一种错误1),为此测试的数字本身就是 suitable。所以我们应该创建一个 Go 通道,其类型是一对结果:这是我测试的数字,这是我的答案:
type result struct {
tested int // the number tested
passed bool // pass/fail result
}
testC := make(chan int)
resultC := make(chan result)
接下来,我们将使用典型的 "pool of workers"。然后我们 运行 我们的循环 things-to-test。这是您现有的循环:
for i := 0; count < n; i++ {
go PrimeNumber(i)
go PalindromeNumber(i)
if PrimeNumber(i) && PalindromeNumber(i) {
primePalindrome = i
count++
}
}
我们将其重组为:
count := 0
busy := 0
results := []result{}
for toTest := 0;; toTest += 2 {
// if all workers are busy, wait for one result
if busy >= numWorkers {
result := <-resultC // get one result
busy--
results := addResult(results, result)
if result.passed {
count++ // passed: increment count
if count >= n {
break
}
}
}
// still working, so test this number
testC <- toTest
busy++
}
close(testC) // tell workers to stop working
// collect remaining results
for result := range resultC {
results := addResult(results, result)
}
(“繁忙”测试有点笨拙;您可以使用 select 发送或接收,取而代之的是先执行哪个,但是如果您这样做,下面概述的优化会得到一个稍微复杂一点。)
这确实意味着我们的标准工作池模式需要关闭 result 通道 resultC
,这意味着我们将在生成时添加一个 sync.WaitGroup
numWorkers
名工人:
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go worker(&wg, testC, resultC)
}
go func() {
wg.Wait()
close(resultC)
}()
这使我们的 for result := range resultC
循环工作;当我们关闭 testC
时,所有工作人员都停止(并且 return 并通过他们的 defer
调用 wg.Done()
,此处未显示)所以 resultC
关闭一次最后一个工人退出。
现在我们还有一个问题,那就是:结果返回的顺序是semi-random。这就是为什么我们有一些结果。 addResult
函数需要扩展切片并将结果插入到适当的位置,使用 value-tested。当主循环执行到break
语句时,toTest
中的数至少是第n个回文素数,但也有可能是比第n大。所以我们需要收集剩余的结果并向后看,看看是否某个更早的数字实际上是第 n 个。
此时有许多优化要做:特别是,如果我们已经通过 k 测试了数字并且它们都已知通过或失败并且k + numWorkers < n,我们不再需要这些结果中的任何一个(无论它们通过 还是 失败)所以我们可以缩小结果切片。或者,如果我们有兴趣构建 table 个回文素数,我们可以这样做,或者我们可以选择的任何其他方式。以上并不是 最佳 解决方案,只是 a 解决方案。
再次注意我们“过冲”:无论 numWorkers
是什么,我们最多可以测试根本不需要测试的 numWorkers-1
个值。同样, 可能 可以通过让每个工人提前退出(使用某种退出指示器,无论是“完成”渠道还是只是一个 sync/atomic
变量)来优化,如果他们我正在处理一个高于第 now-known-to-be-at-least-n 个值的数字。
1我们可以通过从答案 pre-loaded 开始将问题减半,如果您 choose to consider 1 prime—see also http://ncatlab.org/nlab/show/too+simple+to+be+simple 则回答 1 和 2。然后我们 运行 每次都从 3 向上循环 2,这样我们甚至不必费心测试偶数,因为 2 是唯一的偶数素数。
答案取决于您问题的许多方面
例如,如果每个步骤都是独立的,则可以使用下面的示例。如果下一步取决于上一步,则需要找到其他解决方案。
例如:回文数不依赖于前一个数。您可以并行化回文检测。而质数是一个连续的代码。
也许通过检查除数直到该数的平方根来检查回文是否为质数。您需要对其进行基准测试
numbers := make(chan int, 1000) // add some buffer
var wg sync.WaitGroup
for … { // number of consumers
wg.Add(1)
go consume(&wg, numbers)
}
for i … {
numbers <- i
}
close(numbers)
wg.Wait()
// here all consumers ended properly
…
func consume(wg *sync.WaitGroup, numbers chan int){
defer wg.Done()
for i := range numbers {
// use i
}
}
这是我的解决方案:https://go.dev/play/p/KShMctUK9yg
The question is, what if I want to find the 9999999th palindrome with faster runtime using Go concurrency, how to apply concurrency in PrimeNumber() and PalindromeNumber() function?
与 Torek 类似,我通过创建多个 worker 来增加并发性,每个 worker 都可以检查一个数字,从而并行检查多个数字。
从算法上讲,程序是这样工作的。一个 goroutine 生成可能的素数回文。多个 goroutines 的池都检查候选者。主协程收集结果。当我们至少有足够的结果来提供第 nth 个素数回文时,我们对列表进行排序,然后 return 给出答案。
仔细查看在 goroutine 之间通信的通道和等待组的使用:
多个工作协程。他们的工作是 运行 回文,然后检查候选数字。他们在 candidates
频道收听;当通道关闭时,它们结束,并向 wg sync.WaitGroup
.
传达它们已完成的信息
一个候选生成器。这个 goroutine 通过发送到 candidates
通道将每个候选人发送给其中一个工人。如果发现 done
通道被关闭,则结束。
main goroutine 也作为结果的收集器。它从 resc
(结果)通道读取并将结果添加到列表中。当列表至少达到要求的长度时,它会关闭 done
,向生成器发出停止生成候选人的信号。
done
通道可能看起来多余,但它很重要,因为 main
结果收集器知道我们何时生成候选人,但不是发送到 candidates
的那个。如果我们在 main 中关闭 candidates
,生成器很有可能会尝试写入它,这会使程序崩溃。生成器是一个写作候选者;它是唯一一个“知道”不会再写入候选程序的 goroutine。
请注意,此实现在 least n 处生成素数回文。由于它并行生成素数回文,因此不能保证它们是有序的。在最坏的情况下,我们最多可以生成素数回文 n+m
,其中 m 是工人数减一。
这里有很大的改进空间。我很确定 generator
和 collector
角色可以在一个 select
循环中合并到候选通道和结果通道上。如果 n 和 9999999
一样大,当我在我的 windows 机器上 运行 它时,该程序似乎也很难 - 看看是否你的结果会有所不同。
编辑:性能增强
如果您希望提高性能,这里是我昨晚发现并注意到的一些事情。
不需要校验偶数。 for i := start; ; i += 1 + (i % 2)
跳到下一个奇数,然后每隔一次加 2 以保持奇数。
all palindromes of even numbered decimal length are divisble by 11。因此可以跳过整组偶数长度的数字。我通过将 math.Pow10(len(str))
添加到偶数十进制表示来添加另一个数字来做到这一点。这就是导致程序在很长一段时间内停止输出数字的原因 - 每组偶数数字都不能产生素数回文。
如果数字的十进制表示法以偶数开头,则它不能是素数回文,除非它只有一位数长。 5 也是如此。在下面的代码中,我将 math.Pow10(len(str)-1)
添加到数字以移动到下一个奇数序列。如果数字以 5 开头,我将其加倍以移动到下一个奇数序列。
这些技巧使代码的性能提高了很多,但它仍然是一天结束时的蛮力,我仍然没有接近 9999999。
// send candidates
go func() {
// send candidates out
for i := start; ; i += 1 + (i % 2) {
str := strconv.FormatInt(i, 10)
if len(str) % 2 == 0 && i != 11 {
newi := int64(math.Pow10(len(str)))+1
log.Printf("%d => %d", i, newi)
i = newi
continue
}
if first := (str[0]-'0'); first % 2 == 0 || first == 5 {
if i < 10 {
continue
}
nexti := int64(math.Pow10(len(str)-1))
if first == 5 {
nexti *= 2
}
newi := i + nexti
log.Printf("%d -> %d", i, newi)
i = newi-2
continue
}
select {
case _, ok := <-done:
if !ok {
close(candidates)
return
}
case candidates <- i:
continue
}
}
}()
我正在使用简单的问题示例尝试 Go 中的并发性,即第 n 个回文素数,例如:第 1 到第 9 个回文序列是 2、3、5、7、11、101、131、151。我被卡住了不知道该怎么办。
我现在的代码是这样的:
n := 9999999
count := 0
primePalindrome := 0
for i := 0; count < n; i++ {
go PrimeNumber(i)
go PalindromeNumber(i)
if PrimeNumber(i) && PalindromeNumber(i) {
primePalindrome = i
count++
}
}
我怎么知道函数 PrimeNumber(i) 和 PalindromeNumber(i) 已经由 go routine 执行和完成,以便我可以给出 IF CONDITION 来获取数字?
这里有多个问题需要解决:
- 我们想分拆“是素数”和“是回文”测试
- 我们想按顺序通过测试 的数字
- 当然,我们必须表达问题的“分拆”和“等待结果”部分在我们的编程语言(在本例中为 Go)中。
(除此之外,我们可能想优化我们的素数测试,可能使用 Sieve of Eratothenese 算法或类似算法,这也可能涉及并行性。)
这里的中间问题可能是最难的。不过,是一种相当明显的方法:我们可以观察到,如果我们为每个数字分配一个ascending-order数字tested , 返回的答案 (nis/isnot suitable), 即使以错误的顺序返回, 也很容易 re-shuffled 排序.
由于您的整个循环递增 1(这是一种错误1),为此测试的数字本身就是 suitable。所以我们应该创建一个 Go 通道,其类型是一对结果:这是我测试的数字,这是我的答案:
type result struct {
tested int // the number tested
passed bool // pass/fail result
}
testC := make(chan int)
resultC := make(chan result)
接下来,我们将使用典型的 "pool of workers"。然后我们 运行 我们的循环 things-to-test。这是您现有的循环:
for i := 0; count < n; i++ {
go PrimeNumber(i)
go PalindromeNumber(i)
if PrimeNumber(i) && PalindromeNumber(i) {
primePalindrome = i
count++
}
}
我们将其重组为:
count := 0
busy := 0
results := []result{}
for toTest := 0;; toTest += 2 {
// if all workers are busy, wait for one result
if busy >= numWorkers {
result := <-resultC // get one result
busy--
results := addResult(results, result)
if result.passed {
count++ // passed: increment count
if count >= n {
break
}
}
}
// still working, so test this number
testC <- toTest
busy++
}
close(testC) // tell workers to stop working
// collect remaining results
for result := range resultC {
results := addResult(results, result)
}
(“繁忙”测试有点笨拙;您可以使用 select 发送或接收,取而代之的是先执行哪个,但是如果您这样做,下面概述的优化会得到一个稍微复杂一点。)
这确实意味着我们的标准工作池模式需要关闭 result 通道 resultC
,这意味着我们将在生成时添加一个 sync.WaitGroup
numWorkers
名工人:
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go worker(&wg, testC, resultC)
}
go func() {
wg.Wait()
close(resultC)
}()
这使我们的 for result := range resultC
循环工作;当我们关闭 testC
时,所有工作人员都停止(并且 return 并通过他们的 defer
调用 wg.Done()
,此处未显示)所以 resultC
关闭一次最后一个工人退出。
现在我们还有一个问题,那就是:结果返回的顺序是semi-random。这就是为什么我们有一些结果。 addResult
函数需要扩展切片并将结果插入到适当的位置,使用 value-tested。当主循环执行到break
语句时,toTest
中的数至少是第n个回文素数,但也有可能是比第n大。所以我们需要收集剩余的结果并向后看,看看是否某个更早的数字实际上是第 n 个。
此时有许多优化要做:特别是,如果我们已经通过 k 测试了数字并且它们都已知通过或失败并且k + numWorkers < n,我们不再需要这些结果中的任何一个(无论它们通过 还是 失败)所以我们可以缩小结果切片。或者,如果我们有兴趣构建 table 个回文素数,我们可以这样做,或者我们可以选择的任何其他方式。以上并不是 最佳 解决方案,只是 a 解决方案。
再次注意我们“过冲”:无论 numWorkers
是什么,我们最多可以测试根本不需要测试的 numWorkers-1
个值。同样, 可能 可以通过让每个工人提前退出(使用某种退出指示器,无论是“完成”渠道还是只是一个 sync/atomic
变量)来优化,如果他们我正在处理一个高于第 now-known-to-be-at-least-n 个值的数字。
1我们可以通过从答案 pre-loaded 开始将问题减半,如果您 choose to consider 1 prime—see also http://ncatlab.org/nlab/show/too+simple+to+be+simple 则回答 1 和 2。然后我们 运行 每次都从 3 向上循环 2,这样我们甚至不必费心测试偶数,因为 2 是唯一的偶数素数。
答案取决于您问题的许多方面
例如,如果每个步骤都是独立的,则可以使用下面的示例。如果下一步取决于上一步,则需要找到其他解决方案。
例如:回文数不依赖于前一个数。您可以并行化回文检测。而质数是一个连续的代码。
也许通过检查除数直到该数的平方根来检查回文是否为质数。您需要对其进行基准测试
numbers := make(chan int, 1000) // add some buffer
var wg sync.WaitGroup
for … { // number of consumers
wg.Add(1)
go consume(&wg, numbers)
}
for i … {
numbers <- i
}
close(numbers)
wg.Wait()
// here all consumers ended properly
…
func consume(wg *sync.WaitGroup, numbers chan int){
defer wg.Done()
for i := range numbers {
// use i
}
}
这是我的解决方案:https://go.dev/play/p/KShMctUK9yg
The question is, what if I want to find the 9999999th palindrome with faster runtime using Go concurrency, how to apply concurrency in PrimeNumber() and PalindromeNumber() function?
与 Torek 类似,我通过创建多个 worker 来增加并发性,每个 worker 都可以检查一个数字,从而并行检查多个数字。
从算法上讲,程序是这样工作的。一个 goroutine 生成可能的素数回文。多个 goroutines 的池都检查候选者。主协程收集结果。当我们至少有足够的结果来提供第 nth 个素数回文时,我们对列表进行排序,然后 return 给出答案。
仔细查看在 goroutine 之间通信的通道和等待组的使用:
多个工作协程。他们的工作是 运行 回文,然后检查候选数字。他们在
传达它们已完成的信息candidates
频道收听;当通道关闭时,它们结束,并向wg sync.WaitGroup
.一个候选生成器。这个 goroutine 通过发送到
candidates
通道将每个候选人发送给其中一个工人。如果发现done
通道被关闭,则结束。main goroutine 也作为结果的收集器。它从
resc
(结果)通道读取并将结果添加到列表中。当列表至少达到要求的长度时,它会关闭done
,向生成器发出停止生成候选人的信号。
done
通道可能看起来多余,但它很重要,因为 main
结果收集器知道我们何时生成候选人,但不是发送到 candidates
的那个。如果我们在 main 中关闭 candidates
,生成器很有可能会尝试写入它,这会使程序崩溃。生成器是一个写作候选者;它是唯一一个“知道”不会再写入候选程序的 goroutine。
请注意,此实现在 least n 处生成素数回文。由于它并行生成素数回文,因此不能保证它们是有序的。在最坏的情况下,我们最多可以生成素数回文 n+m
,其中 m 是工人数减一。
这里有很大的改进空间。我很确定 generator
和 collector
角色可以在一个 select
循环中合并到候选通道和结果通道上。如果 n 和 9999999
一样大,当我在我的 windows 机器上 运行 它时,该程序似乎也很难 - 看看是否你的结果会有所不同。
编辑:性能增强
如果您希望提高性能,这里是我昨晚发现并注意到的一些事情。
不需要校验偶数。
for i := start; ; i += 1 + (i % 2)
跳到下一个奇数,然后每隔一次加 2 以保持奇数。all palindromes of even numbered decimal length are divisble by 11。因此可以跳过整组偶数长度的数字。我通过将
math.Pow10(len(str))
添加到偶数十进制表示来添加另一个数字来做到这一点。这就是导致程序在很长一段时间内停止输出数字的原因 - 每组偶数数字都不能产生素数回文。如果数字的十进制表示法以偶数开头,则它不能是素数回文,除非它只有一位数长。 5 也是如此。在下面的代码中,我将
math.Pow10(len(str)-1)
添加到数字以移动到下一个奇数序列。如果数字以 5 开头,我将其加倍以移动到下一个奇数序列。
这些技巧使代码的性能提高了很多,但它仍然是一天结束时的蛮力,我仍然没有接近 9999999。
// send candidates
go func() {
// send candidates out
for i := start; ; i += 1 + (i % 2) {
str := strconv.FormatInt(i, 10)
if len(str) % 2 == 0 && i != 11 {
newi := int64(math.Pow10(len(str)))+1
log.Printf("%d => %d", i, newi)
i = newi
continue
}
if first := (str[0]-'0'); first % 2 == 0 || first == 5 {
if i < 10 {
continue
}
nexti := int64(math.Pow10(len(str)-1))
if first == 5 {
nexti *= 2
}
newi := i + nexti
log.Printf("%d -> %d", i, newi)
i = newi-2
continue
}
select {
case _, ok := <-done:
if !ok {
close(candidates)
return
}
case candidates <- i:
continue
}
}
}()