如何在 Go 中使用 go 并发以获得更好的运行时

How to use go concurrency for a better runtime in Go

我正在使用简单的问题示例尝试 Go 中的并发性,即第 n 个回文素数,例如:第 1 到第 9 个回文序列是 2、3、5、7、11、101、131、151。我被卡住了不知道该怎么办。

我现在的代码是这样的:

n := 9999999

count := 0

primePalindrome := 0

for i := 0; count < n; i++ {
    go PrimeNumber(i)
    go PalindromeNumber(i)
    
    if PrimeNumber(i) && PalindromeNumber(i) {
        primePalindrome = i
        count++
    }
}

我怎么知道函数 PrimeNumber(i) 和 PalindromeNumber(i) 已经由 go routine 执行和完成,以便我可以给出 IF CONDITION 来获取数字?

这里有多个问题需要解决:

  • 我们想分拆“是素数”和“是回文”测试
  • 我们想按顺序通过测试
  • 的数字
  • 当然,我们必须表达问题的“分拆”和“等待结果”部分在我们的编程语言(在本例中为 Go)中。

(除此之外,我们可能想优化我们的素数测试,可能使用 Sieve of Eratothenese 算法或类似算法,这也可能涉及并行性。)

这里的中间问题可能是最难的。不过,一种相当明显的方法:我们可以观察到,如果我们为每个数字分配一个ascending-order数字tested , 返回的答案 (nis/isnot suitable), 即使以错误的顺序返回, 也很容易 re-shuffled 排序.

由于您的整个循环递增 1(这是一种错误1),为此测试的数字本身就是 suitable。所以我们应该创建一个 Go 通道,其类型是一对结果:这是我测试的数字,这是我的答案:

type result struct {
    tested int  // the number tested
    passed bool // pass/fail result
}
testC := make(chan int)
resultC := make(chan result)

接下来,我们将使用典型的 "pool of workers"。然后我们 运行 我们的循环 things-to-test。这是您现有的循环:

for i := 0; count < n; i++ {
    go PrimeNumber(i)
    go PalindromeNumber(i)
    
    if PrimeNumber(i) && PalindromeNumber(i) {
        primePalindrome = i
        count++
    }
}

我们将其重组为:

count := 0
busy := 0
results := []result{}
for toTest := 0;; toTest += 2 {
    // if all workers are busy, wait for one result
    if busy >= numWorkers {
        result := <-resultC // get one result
        busy--
        results := addResult(results, result)
        if result.passed {
            count++ // passed: increment count
            if count >= n {
                break
            }
        }
    }
    // still working, so test this number
    testC <- toTest
    busy++
}
close(testC) // tell workers to stop working
// collect remaining results
for result := range resultC {
    results := addResult(results, result)
}

(“繁忙”测试有点笨拙;您可以使用 select 发送或接收,取而代之的是先执行哪个,但是如果您这样做,下面概述的优化会得到一个稍微复杂一点。)

这确实意味着我们的标准工作池模式需要关闭 result 通道 resultC,这意味着我们将在生成时添加一个 sync.WaitGroup numWorkers 名工人:

var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
    go worker(&wg, testC, resultC)
}
go func() {
    wg.Wait()
    close(resultC)
}()

这使我们的 for result := range resultC 循环工作;当我们关闭 testC 时,所有工作人员都停止(并且 return 并通过他们的 defer 调用 wg.Done(),此处未显示)所以 resultC 关闭一次最后一个工人退出。

现在我们还有一个问题,那就是:结果返回的顺序是semi-random。这就是为什么我们有一些结果。 addResult 函数需要扩展切片并将结果插入到适当的位置,使用 value-tested。当主循环执行到break语句时,toTest中的数至少是第n个回文素数,但也有可能是第n大。所以我们需要收集剩余的结果并向后看,看看是否某个更早的数字实际上是第 n 个。

此时有许多优化要做:特别是,如果我们已经通过 k 测试了数字并且它们都已知通过或失败并且k + numWorkers < n,我们不再需要这些结果中的任何一个(无论它们通过 还是 失败)所以我们可以缩小结果切片。或者,如果我们有兴趣构建 table 个回文素数,我们可以这样做,或者我们可以选择的任何其他方式。以上并不是 最佳 解决方案,只是 a 解决方案。

再次注意我们“过冲”:无论 numWorkers 是什么,我们最多可以测试根本不需要测试的 numWorkers-1 个值。同样, 可能 可以通过让每个工人提前退出(使用某种退出指示器,无论是“完成”渠道还是只是一个 sync/atomic 变量)来优化,如果他们我正在处理一个高于第 now-known-to-be-at-least-n 个值的数字。


1我们可以通过从答案 pre-loaded 开始将问题减半,如果您 choose to consider 1 prime—see also http://ncatlab.org/nlab/show/too+simple+to+be+simple 则回答 1 和 2。然后我们 运行 每次都从 3 向上循环 2,这样我们甚至不必费心测试偶数,因为 2 是唯一的偶数素数。

答案取决于您问题的许多方面

例如,如果每个步骤都是独立的,则可以使用下面的示例。如果下一步取决于上一步,则需要找到其他解决方案。

例如:回文数不依赖于前一个数。您可以并行化回文检测。而质数是一个连续的代码。

也许通过检查除数直到该数的平方根来检查回文是否为质数。您需要对其进行基准测试

numbers := make(chan int, 1000) // add some buffer

var wg sync.WaitGroup

for … { // number of consumers 
    wg.Add(1)
    go consume(&wg, numbers)
}

for i … {
    numbers <- i
}

close(numbers)

wg.Wait()

// here all consumers ended properly
…

func consume(wg *sync.WaitGroup, numbers chan int){
    defer wg.Done()
    for i := range numbers {
         // use i
    }
}

这是我的解决方案:https://go.dev/play/p/KShMctUK9yg

The question is, what if I want to find the 9999999th palindrome with faster runtime using Go concurrency, how to apply concurrency in PrimeNumber() and PalindromeNumber() function?

与 Torek 类似,我通过创建多个 worker 来增加并发性,每个 worker 都可以检查一个数字,从而并行检查多个数字。

从算法上讲,程序是这样工作的。一个 goroutine 生成可能的素数回文。多个 goroutines 的池都检查候选者。主协程收集结果。当我们至少有足够的结果来提供第 nth 个素数回文时,我们对列表进行排序,然后 return 给出答案。

仔细查看在 goroutine 之间通信的通道和等待组的使用:

  • 多个工作协程。他们的工作是 运行 回文,然后检查候选数字。他们在 candidates 频道收听;当通道关闭时,它们结束,并向 wg sync.WaitGroup.

    传达它们已完成的信息
  • 一个候选生成器。这个 goroutine 通过发送到 candidates 通道将每个候选人发送给其中一个工人。如果发现 done 通道被关闭,则结束。

  • main goroutine 也作为结果的收集器。它从 resc(结果)通道读取并将结果添加到列表中。当列表至少达到要求的长度时,它会关闭 done,向生成器发出停止生成候选人的信号。

done 通道可能看起来多余,但它很重要,因为 main 结果收集器知道我们何时生成候选人,但不是发送到 candidates 的那个。如果我们在 main 中关闭 candidates,生成器很有可能会尝试写入它,这会使程序崩溃。生成器是一个写作候选者;它是唯一一个“知道”不会再写入候选程序的 goroutine。

请注意,此实现在 least n 处生成素数回文。由于它并行生成素数回文,因此不能保证它们是有序的。在最坏的情况下,我们最多可以生成素数回文 n+m,其中 m 是工人数减一。

这里有很大的改进空间。我很确定 generatorcollector 角色可以在一个 select 循环中合并到候选通道和结果通道上。如果 n9999999 一样大,当我在我的 windows 机器上 运行 它时,该程序似乎也很难 - 看看是否你的结果会有所不同。

编辑:性能增强

如果您希望提高性能,这里是我昨晚发现并注意到的一些事情。

  • 不需要校验偶数。 for i := start; ; i += 1 + (i % 2) 跳到下一个奇数,然后每隔一次加 2 以保持奇数。

  • all palindromes of even numbered decimal length are divisble by 11。因此可以跳过整组偶数长度的数字。我通过将 math.Pow10(len(str)) 添加到偶数十进制表示来添加另一个数字来做到这一点。这就是导致程序在很长一段时间内停止输出数字的原因 - 每组偶数数字都不能产生素数回文。

  • 如果数字的十进制表示法以偶数开头,则它不能是素数回文,除非它只有一位数长。 5 也是如此。在下面的代码中,我将 math.Pow10(len(str)-1) 添加到数字以移动到下一个奇数序列。如果数字以 5 开头,我将其加倍以移动到下一个奇数序列。

这些技巧使代码的性能提高了很多,但它仍然是一天结束时的蛮力,我仍然没有接近 9999999。

  // send candidates
  go func() {
    // send candidates out
    for i := start; ; i += 1 + (i % 2) {
      str := strconv.FormatInt(i, 10)
      if len(str) % 2 == 0 && i != 11 {
        newi := int64(math.Pow10(len(str)))+1
        log.Printf("%d => %d", i, newi)
        i = newi
        continue
      }
      if first := (str[0]-'0'); first % 2 == 0 || first == 5 {
        if i < 10 {
          continue
        }
        nexti :=  int64(math.Pow10(len(str)-1))
        if first == 5 {
          nexti *= 2
        }
        newi := i + nexti
        log.Printf("%d -> %d", i, newi)
        i = newi-2
        continue
      }
      select {
      case _, ok := <-done:
        if !ok {
          close(candidates)
          return
        }

      case candidates <- i:
        continue
      }
    }
  }()