从 goroutine 获取值并取消另一个 goroutine

Getting value from goroutine and canceling another goroutine

我有一个案例,我正在从 2 个不同的位置(ES 和 REDIS)读取数据,我需要从最快的源读取一个值,因此我触发了 2 个 goroutines,一个是从 ES 获取数据,另一个从 REDIS 获取。

一旦从其中一个 goroutine 中获取了数据,就必须完全取消另一个 goroutine,以免浪费 CPU。

简化:

func A(){
    go funcB(){

    }()

    go funcC(){

    }()

    data := <-channel // 
}

现在一旦收到数据,funcAfuncB 必须取消,无论他们在做什么(我不再关心他们的输出,他们只是在浪费 CPU)

最有效的方法是什么? 可以只使用频道来完成吗?

根据您的实际用例,您有一些选项:

1- 使用两个 goroutines:

这需要sync/Lock:
试试这个模拟样本 (The Go Playground):

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    rand.Seed(time.Now().Unix())
    time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { ES <- 101 })
    time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { REDIS <- 102 })

    go B()
    go C()

    data := <-channel

    fmt.Println(data)

}

func B() {
    check := true
    data := 0
    for {
        select {
        case <-quit:
            return
        case data = <-ES: // receive data
        }
        if check {
            mx.Lock()
            //defer mx.Unlock()
            if mx.done {
                mx.Unlock()
                return
            }
            check = false
            close(quit)
            mx.done = true
            mx.Unlock()
        }
        fmt.Println("ES ready")
        channel <- data
    }
}

func C() {
    check := true
    data := 0
    for {
        select {
        case <-quit:
            return
        case data = <-REDIS: // receive data
        }
        if check {
            mx.Lock()
            //defer mx.Unlock()
            if mx.done {
                mx.Unlock()
                return
            }
            check = false
            close(quit)
            mx.done = true
            mx.Unlock()
        }
        fmt.Println("REDIS ready")
        channel <- data
    }
}

var (
    channel = make(chan int)
    ES      = make(chan int)
    REDIS   = make(chan int)
    quit    = make(chan struct{})
    mx      lockdown
)

type lockdown struct {
    sync.Mutex
    done bool
}

2- 在此示例中,您只需启动一个 goroutine BC:
看这个伪代码:

func main() { 
    go A()
    data := <-channel
    fmt.Println(data)
}

func A() {
    for{
        if ES ready 
            go B(data)
            return
        if REDIS ready
            go C(data)
            return
    }
}

您可以启动 A goroutine,在 A goroutine 中它会检测哪个输入准备就绪,例如ESREDIS,然后相应地启动 BC goroutine:

试试这个模拟样本 (The Go Playground):
AfterFunc 只是为了模拟,在实际代码中你不需要它,它模拟一个输入的随机时序。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    rand.Seed(time.Now().Unix())
    time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { ES <- 101 })
    time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { REDIS <- 102 })
    go A()

    data := <-channel

    fmt.Println(data)

}

func A() {
    select {
    case data := <-ES:
        go B(data)
        return
    case data := <-REDIS:
        go C(data)
        return
    }
}

func B(data int) {
    for {
        fmt.Println("ES ready")
        channel <- data
        data = <-ES
    }
}
func C(data int) {
    for {
        fmt.Println("REDIS ready")
        channel <- data
        data = <-REDIS
    }
}

var (
    channel = make(chan int)
    ES      = make(chan int)
    REDIS   = make(chan int)
)

运行 1:

的输出
REDIS ready
102

来自 运行 2 的输出:

ES ready
101

context package 为此提供了取消、超时和截止日期上下文。在这里你可以看到一个取消的例子,我们等待较慢的 goroutine 打印取消的消息:

ctx, cancel := context.WithCancel(context.Background())

// buffer the channel for extra results returned before cancelation
data := make(chan string, 2)

var wg sync.WaitGroup
wg.Add(1)
go func() {
    defer wg.Done()
    select {
    case <-time.After(100 * time.Millisecond):
        data <- "A complete"
    case <-ctx.Done():
        fmt.Println("A cancelled")
    }
}()

wg.Add(1)
go func() {
    defer wg.Done()
    select {
    case <-time.After(200 * time.Millisecond):
        data <- "B complete"
    case <-ctx.Done():
        fmt.Println("B cancelled")
    }
}()

resp := <-data
cancel()
fmt.Println(resp)
wg.Wait()

https://play.golang.org/p/vAhksjKozW