关闭具有循环依赖性的通道

Closing channels with cyclical dependencies

我正在尝试在 Golang 中实现类似 mapreduce 的方法。我的设计如下:

这导致 mapper 输出和 reduce 输入之间的循环依赖,我现在不知道如何发出 mapper 输出完成的信号(并关闭通道)。

打破这种循环依赖或了解何时关闭具有这种循环行为的通道的最佳方法是什么?

下面的代码有一个死锁,map 输出通道和 reduce 输入通道相互等待。

type MapFn func(input int) (int, int)
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)
    go func() {
        for v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()
    for i := 0; i < nMappers; i++ {
        go func() {
            for v := range inputMapChan {
                k, v := mapFn(v)
                outputMapChan <- &kvPair{k, v}
            }
        }()
    }
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
            }
        }()
    }
    for v := range outputMapChan {
        key := v.k
        value := v.v
        other, ok := outputMapMap[key]
        if ok {
            delete(outputMapMap, key)
            reduceInputChan <- &reducePair{key, value, other}
        } else {
            outputMapMap[key] = value
        }
    }
    return outputMapMap, nil
}

试试这个:

package main

import "fmt"
import "sync"
import "sync/atomic"
import "runtime"
import "math/rand"
import "time"

type MapFn func(input int) *kvPair
type ReduceFn func(a int, b int) int

type kvPair struct {
    k int
    v int
}

type reducePair struct {
    k  int
    v1 int
    v2 int
}

func MapReduce(mapFn MapFn, reduceFn ReduceFn, input []int, nMappers int, nReducers int) (map[int]int, error) {
    inputMapChan := make(chan int, len(input))
    outputMapChan := make(chan *kvPair, len(input))
    reduceInputChan := make(chan *reducePair)
    outputMapMap := make(map[int]int)

    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for _, v := range input {
            inputMapChan <- v
        }
        close(inputMapChan)
    }()

    for i := 0; i < nMappers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for v := range inputMapChan {
                outputMapChan <- mapFn(v)
            }
        }()
    }

    finished := false
    go func() {
        wg.Wait()
        finished = true
    }()

    var count int64
    for i := 0; i < nReducers; i++ {
        go func() {
            for v := range reduceInputChan {
                reduceValue := reduceFn(v.v1, v.v2)
                outputMapChan <- &kvPair{v.k, reduceValue}
                atomic.AddInt64(&count, -1)
            }
        }()
    }

    wg2 := sync.WaitGroup{}
    wg2.Add(1)
    go func() {
        defer wg2.Done()
        for {
            select {
            default:
                if finished && atomic.LoadInt64(&count) == 0 && len(outputMapChan) == 0 {
                    return
                }
                //runtime.Gosched()
            case v := <-outputMapChan:
                key := v.k
                value := v.v
                if other, ok := outputMapMap[key]; ok {
                    delete(outputMapMap, key)
                    atomic.AddInt64(&count, 1)
                    reduceInputChan <- &reducePair{key, value, other}
                } else {
                    outputMapMap[key] = value
                }
            }
        }
    }()

    wg2.Wait()
    return outputMapMap, nil
}

func main() {
    fmt.Println("NumCPU =", runtime.NumCPU())
    t := time.Now()
    a := rand.Perm(1000000)
    //a = []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 12, 13, 1, 16, 2}
    m, err := MapReduce(mp, rdc, a, 2, 2)
    if err != nil {
        panic(err)
    }
    fmt.Println(time.Since(t)) //883ms
    fmt.Println(m)
    fmt.Println("done.")
}

func mp(input int) *kvPair {
    return &kvPair{input & 7, input >> 3}
}
func rdc(a int, b int) int {
    b <<= 3
    if a != 0 {
        b |= a
    }
    return b
}