从 goroutine 获取值并取消另一个 goroutine
Getting value from goroutine and canceling another goroutine
我有一个案例,我正在从 2 个不同的位置(ES 和 REDIS)读取数据,我需要从最快的源读取一个值,因此我触发了 2 个 goroutines,一个是从 ES 获取数据,另一个从 REDIS 获取。
一旦从其中一个 goroutine 中获取了数据,就必须完全取消另一个 goroutine,以免浪费 CPU。
简化:
func A(){
go funcB(){
}()
go funcC(){
}()
data := <-channel //
}
现在一旦收到数据,funcA
或 funcB
必须取消,无论他们在做什么(我不再关心他们的输出,他们只是在浪费 CPU)
最有效的方法是什么?
可以只使用频道来完成吗?
根据您的实际用例,您有一些选项:
1- 使用两个 goroutines:
这需要sync/Lock
:
试试这个模拟样本 (The Go Playground):
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().Unix())
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { ES <- 101 })
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { REDIS <- 102 })
go B()
go C()
data := <-channel
fmt.Println(data)
}
func B() {
check := true
data := 0
for {
select {
case <-quit:
return
case data = <-ES: // receive data
}
if check {
mx.Lock()
//defer mx.Unlock()
if mx.done {
mx.Unlock()
return
}
check = false
close(quit)
mx.done = true
mx.Unlock()
}
fmt.Println("ES ready")
channel <- data
}
}
func C() {
check := true
data := 0
for {
select {
case <-quit:
return
case data = <-REDIS: // receive data
}
if check {
mx.Lock()
//defer mx.Unlock()
if mx.done {
mx.Unlock()
return
}
check = false
close(quit)
mx.done = true
mx.Unlock()
}
fmt.Println("REDIS ready")
channel <- data
}
}
var (
channel = make(chan int)
ES = make(chan int)
REDIS = make(chan int)
quit = make(chan struct{})
mx lockdown
)
type lockdown struct {
sync.Mutex
done bool
}
2- 在此示例中,您只需启动一个 goroutine B
或 C
:
看这个伪代码:
func main() {
go A()
data := <-channel
fmt.Println(data)
}
func A() {
for{
if ES ready
go B(data)
return
if REDIS ready
go C(data)
return
}
}
您可以启动 A
goroutine,在 A
goroutine 中它会检测哪个输入准备就绪,例如ES
或 REDIS
,然后相应地启动 B
或 C
goroutine:
试试这个模拟样本 (The Go Playground):
AfterFunc 只是为了模拟,在实际代码中你不需要它,它模拟一个输入的随机时序。
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
rand.Seed(time.Now().Unix())
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { ES <- 101 })
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { REDIS <- 102 })
go A()
data := <-channel
fmt.Println(data)
}
func A() {
select {
case data := <-ES:
go B(data)
return
case data := <-REDIS:
go C(data)
return
}
}
func B(data int) {
for {
fmt.Println("ES ready")
channel <- data
data = <-ES
}
}
func C(data int) {
for {
fmt.Println("REDIS ready")
channel <- data
data = <-REDIS
}
}
var (
channel = make(chan int)
ES = make(chan int)
REDIS = make(chan int)
)
运行 1:
的输出
REDIS ready
102
来自 运行 2 的输出:
ES ready
101
context package 为此提供了取消、超时和截止日期上下文。在这里你可以看到一个取消的例子,我们等待较慢的 goroutine 打印取消的消息:
ctx, cancel := context.WithCancel(context.Background())
// buffer the channel for extra results returned before cancelation
data := make(chan string, 2)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-time.After(100 * time.Millisecond):
data <- "A complete"
case <-ctx.Done():
fmt.Println("A cancelled")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-time.After(200 * time.Millisecond):
data <- "B complete"
case <-ctx.Done():
fmt.Println("B cancelled")
}
}()
resp := <-data
cancel()
fmt.Println(resp)
wg.Wait()
我有一个案例,我正在从 2 个不同的位置(ES 和 REDIS)读取数据,我需要从最快的源读取一个值,因此我触发了 2 个 goroutines,一个是从 ES 获取数据,另一个从 REDIS 获取。
一旦从其中一个 goroutine 中获取了数据,就必须完全取消另一个 goroutine,以免浪费 CPU。
简化:
func A(){
go funcB(){
}()
go funcC(){
}()
data := <-channel //
}
现在一旦收到数据,funcA
或 funcB
必须取消,无论他们在做什么(我不再关心他们的输出,他们只是在浪费 CPU)
最有效的方法是什么? 可以只使用频道来完成吗?
根据您的实际用例,您有一些选项:
1- 使用两个 goroutines:
这需要sync/Lock
:
试试这个模拟样本 (The Go Playground):
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().Unix())
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { ES <- 101 })
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { REDIS <- 102 })
go B()
go C()
data := <-channel
fmt.Println(data)
}
func B() {
check := true
data := 0
for {
select {
case <-quit:
return
case data = <-ES: // receive data
}
if check {
mx.Lock()
//defer mx.Unlock()
if mx.done {
mx.Unlock()
return
}
check = false
close(quit)
mx.done = true
mx.Unlock()
}
fmt.Println("ES ready")
channel <- data
}
}
func C() {
check := true
data := 0
for {
select {
case <-quit:
return
case data = <-REDIS: // receive data
}
if check {
mx.Lock()
//defer mx.Unlock()
if mx.done {
mx.Unlock()
return
}
check = false
close(quit)
mx.done = true
mx.Unlock()
}
fmt.Println("REDIS ready")
channel <- data
}
}
var (
channel = make(chan int)
ES = make(chan int)
REDIS = make(chan int)
quit = make(chan struct{})
mx lockdown
)
type lockdown struct {
sync.Mutex
done bool
}
2- 在此示例中,您只需启动一个 goroutine B
或 C
:
看这个伪代码:
func main() {
go A()
data := <-channel
fmt.Println(data)
}
func A() {
for{
if ES ready
go B(data)
return
if REDIS ready
go C(data)
return
}
}
您可以启动 A
goroutine,在 A
goroutine 中它会检测哪个输入准备就绪,例如ES
或 REDIS
,然后相应地启动 B
或 C
goroutine:
试试这个模拟样本 (The Go Playground):
AfterFunc 只是为了模拟,在实际代码中你不需要它,它模拟一个输入的随机时序。
package main
import (
"fmt"
"math/rand"
"time"
)
func main() {
rand.Seed(time.Now().Unix())
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { ES <- 101 })
time.AfterFunc(time.Duration(rand.Intn(1000))*time.Millisecond, func() { REDIS <- 102 })
go A()
data := <-channel
fmt.Println(data)
}
func A() {
select {
case data := <-ES:
go B(data)
return
case data := <-REDIS:
go C(data)
return
}
}
func B(data int) {
for {
fmt.Println("ES ready")
channel <- data
data = <-ES
}
}
func C(data int) {
for {
fmt.Println("REDIS ready")
channel <- data
data = <-REDIS
}
}
var (
channel = make(chan int)
ES = make(chan int)
REDIS = make(chan int)
)
运行 1:
的输出REDIS ready
102
来自 运行 2 的输出:
ES ready
101
context package 为此提供了取消、超时和截止日期上下文。在这里你可以看到一个取消的例子,我们等待较慢的 goroutine 打印取消的消息:
ctx, cancel := context.WithCancel(context.Background())
// buffer the channel for extra results returned before cancelation
data := make(chan string, 2)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-time.After(100 * time.Millisecond):
data <- "A complete"
case <-ctx.Done():
fmt.Println("A cancelled")
}
}()
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-time.After(200 * time.Millisecond):
data <- "B complete"
case <-ctx.Done():
fmt.Println("B cancelled")
}
}()
resp := <-data
cancel()
fmt.Println(resp)
wg.Wait()