如何使用 Go 丢弃并发过滤管道中的值?

How to discard values in a concurrent filtering pipeline with Go?

我想知道并了解如何使用 go 执行过滤并发管道 在 producer/consumer 方案中。

我已经设法编写了一个版本来检查一个值,如果没问题,就将它发送到一个频道 如果不是,则将该值发送到另一个通道。

读取和处理值后,两个goroutine负责读取处理后的值 并将它们写入文件。这个版本工作正常。但是...

  1. 假设我不想要无效值。有没有办法更改 select 语句(或消费者 goroutine),以便仅 输出正确的值(即仅使用一个输出通道)。我尝试删除那个 invalidValues 频道但是 我没有成功。

  2. 我尝试将 select 语句放在 if valid? 中;有一个分支与此版本中的完整语句和 false 分支 只需等待完成频道。通过这种方式,我可以丢弃无效值并使用一个通道,但我并没有用这种方法成功。

关于如何解决这个问题有什么想法吗?

  1. 此外,在这个方案中,我想知道为什么如果我省略从程序的 invalidValues 通道中删除值的 goroutine 没有完成?是否需要清空通道,否则一直处于阻塞状态?有没有更优雅的方法来做一个范围 价值观?

谢谢!!

//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
    // Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
    go func() {
        for value := range inputStream {
            var c *chan string
            dataToWrite := value
            if valid := checkValue(value); valid {
                dataToWrite = value
                c = &outputStream
            } else {
                c = &invalidValues
            }
            select {
            case *c <- dataToWrite:
            case <-done:
                return
            }
            time.Sleep(time.Duration(5) * time.Second)
        }
        wg.Done()
    }()
}

这里是代码的完整版本

done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)

//Producer reads a file with values and stores them in a channel
go func() {
    count := 0
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        inputStream <- strings.TrimSpace(scanner.Text())
        count = count + 1
    }
    close(inputStream)
}()

//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
    // Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
    go func() {
        for value := range inputStream {
            var c *chan string
            dataToWrite := value
            if valid := checkValue(value); valid {
                dataToWrite = value
                c = &outputStream
            } else {
                c = &invalidValues
            }
            select {
            case *c <- dataToWrite:
            case <-done:
                return
            }
            time.Sleep(time.Duration(5) * time.Second)
        }
        wg.Done()
    }()
}

go func() {
    wg.Wait()
    close(outputStream)
    close(invalidValues)
}()

//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
    log.Fatal(err)
}

//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
    log.Fatal(err)
}

//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
    //Write outputStream and error to files
    for r := range outputStream {
        _, err := resultFile.WriteString(r + "\n")
        if err != nil {
            log.Fatal(err)
        }
    }
    resultFile.Close()
    wg2.Done()
}()

go func() {
    for r := range invalidValues {
        _, err := errorFile.WriteString(r + "\n")
        if err != nil {
            log.Fatal(err)
        }
    }
    errorFile.Close()
    wg2.Done()
}()
wg2.Wait()

删除无效频道:

 for value := range inputStream {
      var c *chan string
      if valid := checkValue(value); valid {
            select {
            case outputStream <- value
            case <-done:
                return
            }
        }
 }

如果删除无效值 reader goroutine,则必须将等待组更改为:

wg2.Add(1)

所以你不会无限期地等待。