Goroutines sharing slices :: 试图理解数据竞争
Goroutines sharing slices : : trying to understand a data race
我尝试用 Go 编写一个程序来在非常大的 DNA 序列文件中找到一些基因。我已经编写了一个 Perl 程序来执行此操作,但我想利用 goroutines 并行执行此搜索;)
因为文件很大,我的想法是一次读取100个序列,然后将分析发送到goroutine,再读取100个序列等
我要感谢本网站的成员,感谢他们对切片和 goroutines 的真正有用的解释。
我已经进行了建议的更改,以使用由 goroutines 处理的切片的副本。但是 -race 执行仍然在 copy()
函数级别检测到一个数据竞争:
非常感谢您的意见!
==================
WARNING: DATA RACE
Read by goroutine 6:
runtime.slicecopy()
/usr/lib/go-1.6/src/runtime/slice.go:113 +0x0
main.main.func1()
test_chan006.go:71 +0xd8
Previous write by main goroutine:
main.main()
test_chan006.go:63 +0x3b7
Goroutine 6 (running) created at:
main.main()
test_chan006.go:73 +0x4c9
==================
[>5HSAA098909 BA098909 ...]
Found 1 data race(s)
exit status 66
line 71 is : copy(bufCopy, buf_Seq)
line 63 is : buf_Seq = append(buf_Seq, line)
line 73 is :}(genes, buf_Seq)
package main
import (
"bufio"
"fmt"
"os"
"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
"sync"
)
// function read a list of genes and return a slice of gene names
func read_genes(filename string) []string {
var genes []string // slice of genes names
// Open the file.
f, _ := os.Open(filename)
// Create a new Scanner for the file.
scanner := bufio.NewScanner(f)
// Loop over all lines in the file and print them.
for scanner.Scan() {
line := scanner.Text()
genes = append(genes, line)
}
return genes
}
// function find the sequences with a gene matching gene[] slice
func search_gene2( genes []string, seqs []string) ([]string) {
var res []string
for r := 0 ; r <= len(seqs) - 1; r++ {
for i := 0 ; i <= len(genes) - 1; i++ {
match := pcre.MustCompile(genes[i], 0).MatcherString(seqs[r], 0)
if (match.Matches() == true) {
res = append( res, seqs[r]) // is the gene matches the gene name is append to res
break
}
}
}
return res
}
//###########################################
func main() {
var slice []string
var buf_Seq []string
read_buff := 100 // the number of sequences analysed by one goroutine
var wg sync.WaitGroup
queue := make(chan []string, 100)
filename := "fasta/sequences.tsv"
f, _ := os.Open(filename)
scanner := bufio.NewScanner(f)
n := 0
genes := read_genes("lists/genes.csv")
for scanner.Scan() {
line := scanner.Text()
n += 1
buf_Seq = append(buf_Seq, line) // store the sequences into buf_Seq
if n == read_buff { // when the read buffer contains 100 sequences one goroutine analyses them
wg.Add(1)
go func(genes, buf_Seq []string) {
defer wg.Done()
bufCopy := make([]string, len(buf_Seq))
copy(bufCopy, buf_Seq)
queue <- search_gene2( genes, bufCopy)
}(genes, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0 // reset the sequences counter
}
}
go func() {
wg.Wait()
close(queue)
}()
for t := range queue {
slice = append(slice, t...)
}
fmt.Println(slice)
}
goroutines 只处理 slice headers 的副本,底层数组是相同的。要复制切片,您需要使用 copy
(或 append
到不同的切片)。
buf_Seq = append(buf_Seq, line)
bufCopy := make([]string, len(buf_Seq))
copy(bufCopy, buf_Seq)
然后您可以安全地将 bufCopy
传递给 goroutines,或者直接在闭包中使用它。
存在数据竞争是因为切片在 Go 中是引用类型。它们通常按值传递,但作为引用类型,对一个值所做的任何更改都会反映在另一个值中。考虑:
func f(xs []string) {
xs[0] = "changed_in_f"
}
func main() {
xs := []string{"set_in_ main", "asd"}
fmt.Println("Before call:", xs)
f(xs)
fmt.Println("After call:", xs)
var ys []string
ys = xs
ys[0] = "changed_through_ys"
fmt.Println("After ys:", xs)
}
这会打印:
Before call: [set_in_main asd]
After call: [changed_in_f asd]
After ys: [changed_through_ys asd]
发生这种情况是因为所有三个切片在内存中共享相同的底层数组。更多详情 here.
当您将 buf_Seq
传递给 search_gene2
时可能会发生这种情况。每次调用都会传递一个新的切片值,但是,每个切片值可能引用相同的底层数组,从而导致潜在的竞争条件(调用 append
可能会更改切片的底层数组)。
要解决此问题,请在 main
:
中尝试此操作
bufCopy := make([]string, len(buf_Seq))
// make a copy of buf_Seq in an entirely separate slice
copy(buffCopy, buf_Seq)
go func(genes, buf_Seq []string) {
defer wg.Done()
queue <- search_gene2( genes, bufCopy)
}(genes, buf_Seq)
}
切片确实是副本,但切片本身是引用类型。从根本上说,slice 是一个 3 字结构。它包含一个指向底层数组开头的指针,一个表示切片中当前元素数的整数,以及另一个表示底层数组容量的整数。当你将一个切片传递给一个函数时,这个切片 "header" 结构的一个副本,但是 header 仍然引用与传入的 header 相同的底层数组。
这意味着您对切片 header 本身所做的任何更改,如 sub-slicing 它,附加到它足以触发调整大小(因此重新分配到新位置,新起始指针)等只会反映在该函数内部的切片 header 中。然而,对基础数据本身的任何更改,甚至会反映在函数外部的切片中(除非您通过增加切片容量来触发重新分配)。
我认为这是 地道的 Go(对于此工作):
一段代码抵千条评论:
genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU() // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
slice = append(slice, t)
}
fmt.Println(slice)
scan()
将 "fasta/sequences.tsv" 读取到此通道:var ch = make(chan string, 100)
同时并且 search()
是 CPU 密集型 goroutine 因此出于性能原因,goroutine 的数量受到限制至 NumCPU
.
试试这个工作示例代码(模拟和测试):
package main
import (
"bufio"
"fmt"
//"os"
"runtime"
"strings"
"sync"
//"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
)
func main() {
genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU() // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
slice = append(slice, t)
}
fmt.Println(slice)
}
var wg sync.WaitGroup
var genes []string
var ch = make(chan string, 100)
var queue = make(chan string, 100)
func scan() {
defer wg.Done()
defer close(ch)
scanner := bufio.NewScanner(strings.NewReader(strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "\n")))
/*f, err := os.Open("fasta/sequences.tsv")
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)*/
for scanner.Scan() {
ch <- scanner.Text()
}
}
func match(pattern, seq string) bool {
//return pcre.MustCompile(pattern, 0).MatcherString(seq, 0).Matches()
return pattern[0] == seq[0]
}
func search() {
defer wg.Done()
for seq := range ch {
for _, gene := range genes {
if match(gene, seq) {
queue <- seq
break
}
}
}
}
func WaitClose() {
wg.Wait()
close(queue)
}
// function read a list of genes and return a slice of gene names.
func readGenes(filename string) []string {
return []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
/*var genes []string // slice of genes names
f, err := os.Open(filename)
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
genes = append(genes, line)
}
return genes*/
}
输出:
[A2 B2 C2 D2 E2 F2 G2 H2 I2]
我希望这对您的实际情况有所帮助(该代码中的注释已切换,未经测试):
package main
import (
"bufio"
"fmt"
"os"
"runtime"
//"strings"
"sync"
"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
//pcre "regexp"
)
func main() {
genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU() // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
slice = append(slice, t)
}
fmt.Println(slice)
}
var wg sync.WaitGroup
var genes []string
var ch = make(chan string, 100)
var queue = make(chan string, 100)
func scan() {
defer wg.Done()
defer close(ch)
//scanner := bufio.NewScanner(strings.NewReader(strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "\n")))
f, err := os.Open("fasta/sequences.tsv")
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
ch <- scanner.Text()
}
}
func match(pattern, seq string) bool {
return pcre.MustCompile(pattern, 0).MatcherString(seq, 0).Matches()
//return pattern[0] == seq[0]
//return pcre.MustCompile(pattern).Match([]byte(seq))
}
func search() {
defer wg.Done()
for seq := range ch {
for _, gene := range genes {
if match(gene, seq) {
queue <- seq
break
}
}
}
}
func WaitClose() {
wg.Wait()
close(queue)
}
// function read a list of genes and return a slice of gene names.
func readGenes(filename string) []string {
//return []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
var genes []string // slice of genes names
f, err := os.Open(filename)
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
genes = append(genes, line)
}
return genes
}
您的代码问题:
1- 在 read_genes(filename string) []string
中你应该检查错误:
f, err := os.Open(filename)
if err!=nil{
panic(err)
}
2- in read_genes(filename string) []string
关闭打开的文件:
defer f.Close()
3- 在filename := "fasta/sequences.tsv"
之后你应该检查错误:
f, err := os.Open(filename)
if err!=nil{
panic(err)
}
4- 之后filename := "fasta/sequences.tsv"
关闭打开的文件:
defer f.Close()
5- 如果此文件 fasta/sequences.tsv
不包含 100 行的倍数,则在 for scanner.Scan() {
内部 if n == read_buff {
最后一个切片将不会成功,您将错过它。
6- 你有多少个 CPU 核心?你应该限制 goroutines 的数量。
7- 您的主要问题:
我做了一个最小的、完整的和可验证的例子(仍然存在问题 5):
package main
import (
"bufio"
"fmt"
"strings"
"sync"
)
func match(pattern, str string) bool {
return pattern[0] == str[0]
}
func search_gene2(genes, seqs []string) (res []string) {
for _, r := range seqs {
for _, i := range genes {
if match(i, r) {
res = append(res, r) // is the gene matches the gene name is append to res
break
}
}
}
return
}
func main() {
read_buff := 2 // the number of sequences analysed by one goroutine
var wg sync.WaitGroup
queue := make(chan []string, read_buff)
genes := []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
sequences := strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "\n")
scanner := bufio.NewScanner(strings.NewReader(sequences))
buf_Seq := make([]string, 0, read_buff)
for n := 1; scanner.Scan(); n++ {
line := scanner.Text()
buf_Seq = append(buf_Seq, line) // store the sequences into buf_Seq
if n == read_buff { // when the read buffer contains 100 sequences one goroutine analyses them
wg.Add(1)
temp := make([]string, n)
copy(temp, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0 // reset the sequences counter
go func(genes, Seq []string) {
defer wg.Done()
fmt.Println(Seq)
queue <- search_gene2(genes, Seq)
}(genes, temp)
}
}
go func() {
wg.Wait()
close(queue)
}()
slice := []string{}
for t := range queue {
slice = append(slice, t...)
}
fmt.Println(slice)
}
输出 (5: I2
?):
[A2 B2]
[C2 D2]
[E2 F2]
[G2 H2]
[A2 B2 C2 D2 E2 F2 G2 H2]
这是您主要问题的解决方案(制作一个新切片并复制所有数据):
temp := make([]string, n)
copy(temp, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0 // reset the sequences counter
go func(genes, Seq []string) {
defer wg.Done()
fmt.Println(Seq)
queue <- search_gene2(genes, Seq)
}(genes, temp)
原因:
找到 1 个数据竞争
退出状态 66
line 71 is : copy(bufCopy, buf_Seq)
line 63 is : buf_Seq = append(buf_Seq, line)
line 73 is :}(genes, buf_Seq)
正如其他答案所说:您与所有 goroutines 共享相同的基础切片数组。
希望对您有所帮助。
我尝试用 Go 编写一个程序来在非常大的 DNA 序列文件中找到一些基因。我已经编写了一个 Perl 程序来执行此操作,但我想利用 goroutines 并行执行此搜索;)
因为文件很大,我的想法是一次读取100个序列,然后将分析发送到goroutine,再读取100个序列等
我要感谢本网站的成员,感谢他们对切片和 goroutines 的真正有用的解释。
我已经进行了建议的更改,以使用由 goroutines 处理的切片的副本。但是 -race 执行仍然在 copy()
函数级别检测到一个数据竞争:
非常感谢您的意见!
==================
WARNING: DATA RACE
Read by goroutine 6:
runtime.slicecopy()
/usr/lib/go-1.6/src/runtime/slice.go:113 +0x0
main.main.func1()
test_chan006.go:71 +0xd8
Previous write by main goroutine:
main.main()
test_chan006.go:63 +0x3b7
Goroutine 6 (running) created at:
main.main()
test_chan006.go:73 +0x4c9
==================
[>5HSAA098909 BA098909 ...]
Found 1 data race(s)
exit status 66
line 71 is : copy(bufCopy, buf_Seq)
line 63 is : buf_Seq = append(buf_Seq, line)
line 73 is :}(genes, buf_Seq)
package main
import (
"bufio"
"fmt"
"os"
"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
"sync"
)
// function read a list of genes and return a slice of gene names
func read_genes(filename string) []string {
var genes []string // slice of genes names
// Open the file.
f, _ := os.Open(filename)
// Create a new Scanner for the file.
scanner := bufio.NewScanner(f)
// Loop over all lines in the file and print them.
for scanner.Scan() {
line := scanner.Text()
genes = append(genes, line)
}
return genes
}
// function find the sequences with a gene matching gene[] slice
func search_gene2( genes []string, seqs []string) ([]string) {
var res []string
for r := 0 ; r <= len(seqs) - 1; r++ {
for i := 0 ; i <= len(genes) - 1; i++ {
match := pcre.MustCompile(genes[i], 0).MatcherString(seqs[r], 0)
if (match.Matches() == true) {
res = append( res, seqs[r]) // is the gene matches the gene name is append to res
break
}
}
}
return res
}
//###########################################
func main() {
var slice []string
var buf_Seq []string
read_buff := 100 // the number of sequences analysed by one goroutine
var wg sync.WaitGroup
queue := make(chan []string, 100)
filename := "fasta/sequences.tsv"
f, _ := os.Open(filename)
scanner := bufio.NewScanner(f)
n := 0
genes := read_genes("lists/genes.csv")
for scanner.Scan() {
line := scanner.Text()
n += 1
buf_Seq = append(buf_Seq, line) // store the sequences into buf_Seq
if n == read_buff { // when the read buffer contains 100 sequences one goroutine analyses them
wg.Add(1)
go func(genes, buf_Seq []string) {
defer wg.Done()
bufCopy := make([]string, len(buf_Seq))
copy(bufCopy, buf_Seq)
queue <- search_gene2( genes, bufCopy)
}(genes, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0 // reset the sequences counter
}
}
go func() {
wg.Wait()
close(queue)
}()
for t := range queue {
slice = append(slice, t...)
}
fmt.Println(slice)
}
goroutines 只处理 slice headers 的副本,底层数组是相同的。要复制切片,您需要使用 copy
(或 append
到不同的切片)。
buf_Seq = append(buf_Seq, line)
bufCopy := make([]string, len(buf_Seq))
copy(bufCopy, buf_Seq)
然后您可以安全地将 bufCopy
传递给 goroutines,或者直接在闭包中使用它。
存在数据竞争是因为切片在 Go 中是引用类型。它们通常按值传递,但作为引用类型,对一个值所做的任何更改都会反映在另一个值中。考虑:
func f(xs []string) {
xs[0] = "changed_in_f"
}
func main() {
xs := []string{"set_in_ main", "asd"}
fmt.Println("Before call:", xs)
f(xs)
fmt.Println("After call:", xs)
var ys []string
ys = xs
ys[0] = "changed_through_ys"
fmt.Println("After ys:", xs)
}
这会打印:
Before call: [set_in_main asd]
After call: [changed_in_f asd]
After ys: [changed_through_ys asd]
发生这种情况是因为所有三个切片在内存中共享相同的底层数组。更多详情 here.
当您将 buf_Seq
传递给 search_gene2
时可能会发生这种情况。每次调用都会传递一个新的切片值,但是,每个切片值可能引用相同的底层数组,从而导致潜在的竞争条件(调用 append
可能会更改切片的底层数组)。
要解决此问题,请在 main
:
bufCopy := make([]string, len(buf_Seq))
// make a copy of buf_Seq in an entirely separate slice
copy(buffCopy, buf_Seq)
go func(genes, buf_Seq []string) {
defer wg.Done()
queue <- search_gene2( genes, bufCopy)
}(genes, buf_Seq)
}
切片确实是副本,但切片本身是引用类型。从根本上说,slice 是一个 3 字结构。它包含一个指向底层数组开头的指针,一个表示切片中当前元素数的整数,以及另一个表示底层数组容量的整数。当你将一个切片传递给一个函数时,这个切片 "header" 结构的一个副本,但是 header 仍然引用与传入的 header 相同的底层数组。
这意味着您对切片 header 本身所做的任何更改,如 sub-slicing 它,附加到它足以触发调整大小(因此重新分配到新位置,新起始指针)等只会反映在该函数内部的切片 header 中。然而,对基础数据本身的任何更改,甚至会反映在函数外部的切片中(除非您通过增加切片容量来触发重新分配)。
我认为这是 地道的 Go(对于此工作):
一段代码抵千条评论:
genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU() // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
slice = append(slice, t)
}
fmt.Println(slice)
scan()
将 "fasta/sequences.tsv" 读取到此通道:var ch = make(chan string, 100)
同时并且 search()
是 CPU 密集型 goroutine 因此出于性能原因,goroutine 的数量受到限制至 NumCPU
.
试试这个工作示例代码(模拟和测试):
package main
import (
"bufio"
"fmt"
//"os"
"runtime"
"strings"
"sync"
//"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
)
func main() {
genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU() // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
slice = append(slice, t)
}
fmt.Println(slice)
}
var wg sync.WaitGroup
var genes []string
var ch = make(chan string, 100)
var queue = make(chan string, 100)
func scan() {
defer wg.Done()
defer close(ch)
scanner := bufio.NewScanner(strings.NewReader(strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "\n")))
/*f, err := os.Open("fasta/sequences.tsv")
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)*/
for scanner.Scan() {
ch <- scanner.Text()
}
}
func match(pattern, seq string) bool {
//return pcre.MustCompile(pattern, 0).MatcherString(seq, 0).Matches()
return pattern[0] == seq[0]
}
func search() {
defer wg.Done()
for seq := range ch {
for _, gene := range genes {
if match(gene, seq) {
queue <- seq
break
}
}
}
}
func WaitClose() {
wg.Wait()
close(queue)
}
// function read a list of genes and return a slice of gene names.
func readGenes(filename string) []string {
return []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
/*var genes []string // slice of genes names
f, err := os.Open(filename)
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
genes = append(genes, line)
}
return genes*/
}
输出:
[A2 B2 C2 D2 E2 F2 G2 H2 I2]
我希望这对您的实际情况有所帮助(该代码中的注释已切换,未经测试):
package main
import (
"bufio"
"fmt"
"os"
"runtime"
//"strings"
"sync"
"github.com/mathpl/golang-pkg-pcre/src/pkg/pcre"
//pcre "regexp"
)
func main() {
genes = readGenes("lists/genes.csv") // read the gene list
n := runtime.NumCPU() // the number of goroutines
wg.Add(n + 1)
go scan() // read the "fasta/sequences.tsv"
for i := 0; i < n; i++ {
go search()
}
go WaitClose()
slice := []string{}
for t := range queue {
slice = append(slice, t)
}
fmt.Println(slice)
}
var wg sync.WaitGroup
var genes []string
var ch = make(chan string, 100)
var queue = make(chan string, 100)
func scan() {
defer wg.Done()
defer close(ch)
//scanner := bufio.NewScanner(strings.NewReader(strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "\n")))
f, err := os.Open("fasta/sequences.tsv")
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
ch <- scanner.Text()
}
}
func match(pattern, seq string) bool {
return pcre.MustCompile(pattern, 0).MatcherString(seq, 0).Matches()
//return pattern[0] == seq[0]
//return pcre.MustCompile(pattern).Match([]byte(seq))
}
func search() {
defer wg.Done()
for seq := range ch {
for _, gene := range genes {
if match(gene, seq) {
queue <- seq
break
}
}
}
}
func WaitClose() {
wg.Wait()
close(queue)
}
// function read a list of genes and return a slice of gene names.
func readGenes(filename string) []string {
//return []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
var genes []string // slice of genes names
f, err := os.Open(filename)
if err != nil {
panic(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
genes = append(genes, line)
}
return genes
}
您的代码问题:
1- 在 read_genes(filename string) []string
中你应该检查错误:
f, err := os.Open(filename)
if err!=nil{
panic(err)
}
2- in read_genes(filename string) []string
关闭打开的文件:
defer f.Close()
3- 在filename := "fasta/sequences.tsv"
之后你应该检查错误:
f, err := os.Open(filename)
if err!=nil{
panic(err)
}
4- 之后filename := "fasta/sequences.tsv"
关闭打开的文件:
defer f.Close()
5- 如果此文件 fasta/sequences.tsv
不包含 100 行的倍数,则在 for scanner.Scan() {
内部 if n == read_buff {
最后一个切片将不会成功,您将错过它。
6- 你有多少个 CPU 核心?你应该限制 goroutines 的数量。
7- 您的主要问题:
我做了一个最小的、完整的和可验证的例子(仍然存在问题 5):
package main
import (
"bufio"
"fmt"
"strings"
"sync"
)
func match(pattern, str string) bool {
return pattern[0] == str[0]
}
func search_gene2(genes, seqs []string) (res []string) {
for _, r := range seqs {
for _, i := range genes {
if match(i, r) {
res = append(res, r) // is the gene matches the gene name is append to res
break
}
}
}
return
}
func main() {
read_buff := 2 // the number of sequences analysed by one goroutine
var wg sync.WaitGroup
queue := make(chan []string, read_buff)
genes := []string{"A1", "B1", "C1", "D1", "E1", "F1", "G1", "H1", "I1"}
sequences := strings.Join([]string{"A2", "B2", "C2", "D2", "E2", "F2", "G2", "H2", "I2"}, "\n")
scanner := bufio.NewScanner(strings.NewReader(sequences))
buf_Seq := make([]string, 0, read_buff)
for n := 1; scanner.Scan(); n++ {
line := scanner.Text()
buf_Seq = append(buf_Seq, line) // store the sequences into buf_Seq
if n == read_buff { // when the read buffer contains 100 sequences one goroutine analyses them
wg.Add(1)
temp := make([]string, n)
copy(temp, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0 // reset the sequences counter
go func(genes, Seq []string) {
defer wg.Done()
fmt.Println(Seq)
queue <- search_gene2(genes, Seq)
}(genes, temp)
}
}
go func() {
wg.Wait()
close(queue)
}()
slice := []string{}
for t := range queue {
slice = append(slice, t...)
}
fmt.Println(slice)
}
输出 (5: I2
?):
[A2 B2]
[C2 D2]
[E2 F2]
[G2 H2]
[A2 B2 C2 D2 E2 F2 G2 H2]
这是您主要问题的解决方案(制作一个新切片并复制所有数据):
temp := make([]string, n)
copy(temp, buf_Seq)
buf_Seq = buf_Seq[:0] // reset buf_Seq
n = 0 // reset the sequences counter
go func(genes, Seq []string) {
defer wg.Done()
fmt.Println(Seq)
queue <- search_gene2(genes, Seq)
}(genes, temp)
原因:
找到 1 个数据竞争
退出状态 66
line 71 is : copy(bufCopy, buf_Seq)
line 63 is : buf_Seq = append(buf_Seq, line)
line 73 is :}(genes, buf_Seq)
正如其他答案所说:您与所有 goroutines 共享相同的基础切片数组。
希望对您有所帮助。