在完成之前去例行结束

go routine end before done

我想用多个 go 例程异步执行事情。我传入 "threads" 的数量以用于异步处理文件。文件是要处理的字符串数组。

queue := make(chan string)

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
} 

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}

for _, f := range files {
    queue <- f
}
close(queue)

例程本身如下所示:

func renderGoRoutine(queue chan string) {
    for file := range queue { 
        // do some heavy lifting stuff with the file
    }
}

只要我只使用一个线程,它就可以正常工作。一旦我拿了一个以上,它就会在完成所有 go 例程之前执行 exit/leave 范围。

如何让它处理一切?

上一题:

您正在使用该频道发布待完成的工作。一旦从队列中取出最后一项(未完成处理),您的程序就会退出。

您可以使用通道在 renderGoRoutine 结束时写入以表示处理结束。

顶部:

sync := make(chan bool)

renderGoRoutine最后(假设在同一个文件中):

sync <- true

底部:

for f := range sync {
    <- sync
}

现在您的程序会一直等待,直到处理完文件数量。

或者有一个完整的例子:

queue := make(chan string)
sync := make(chan bool)

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
} 

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}

for _, f := range files {
    queue <- f
}
close(queue)

for f := range sync {
    <- sync
}

而且例程应该改成这样:

func renderGoRoutine(queue chan string) {
    for file := range queue { 
        // do some heavy lifting stuff with the file
        sync <- true
    }
}

使用 WaitGroups 是一种选择。

开始时,您将多个任务添加到 WaitGroup 中,并在每个任务完成后递减 WaitGroup 中的计数器。等到代码流结束时所有任务都完成。

看例子:https://godoc.org/sync#WaitGroup

您的代码将如下所示:

queue := make(chan string)

wg := sync.WaitGroup{}
wg.Add(len(files))

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
}

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}


for _, f := range files {
    queue <- f
}

close(queue)
wg.Wait()

renderGoRoutine:

func renderGoRoutine(queue chan string) {
    for file := range queue {
        // do some heavy lifting stuff with the file
        // decrement the waitGroup counter
        wg.Done()
    }
}

我确实忘记等待所有任务完成。这可以简单地通过等待所有循环结束来完成。由于 close(channel) 确实结束了 for range channel,可以像这样使用与频道的简单同步:

sync := make(chan bool)
queue := make(chan string)

threadCount := c.Int("threads")

if c.Int("threads") < len(files) {
    threadCount = len(files)
} 

log.Infof("Starting %i processes", c.Int("threads"))

for i := 0; i < threadCount; i++ {
    go renderGoRoutine(queue)
}

for _, f := range files {
    queue <- f
}
close(queue)

for i := 0; i < threadCount; i++ {
    <- sync
}

最后但同样重要的是,每当迭代停止时写入通道。

func renderGoRoutine(queue chan string) {
    for file := range queue { //whatever is done here
    }
    sync <- true
}