如何使用 Go 丢弃并发过滤管道中的值?
How to discard values in a concurrent filtering pipeline with Go?
我想知道并了解如何使用 go 执行过滤并发管道
在 producer/consumer 方案中。
我已经设法编写了一个版本来检查一个值,如果没问题,就将它发送到一个频道
如果不是,则将该值发送到另一个通道。
读取和处理值后,两个goroutine负责读取处理后的值
并将它们写入文件。这个版本工作正常。但是...
假设我不想要无效值。有没有办法更改 select 语句(或消费者 goroutine),以便仅
输出正确的值(即仅使用一个输出通道)。我尝试删除那个 invalidValues 频道但是
我没有成功。
我尝试将 select 语句放在 if valid?
中;有一个分支与此版本中的完整语句和 false 分支
只需等待完成频道。通过这种方式,我可以丢弃无效值并使用一个通道,但我并没有用这种方法成功。
关于如何解决这个问题有什么想法吗?
- 此外,在这个方案中,我想知道为什么如果我省略从程序的 invalidValues 通道中删除值的 goroutine
没有完成?是否需要清空通道,否则一直处于阻塞状态?有没有更优雅的方法来做一个范围
价值观?
谢谢!!
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
这里是代码的完整版本
done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//Producer reads a file with values and stores them in a channel
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//Write outputStream and error to files
for r := range outputStream {
_, err := resultFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()
删除无效频道:
for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value
case <-done:
return
}
}
}
如果删除无效值 reader goroutine,则必须将等待组更改为:
wg2.Add(1)
所以你不会无限期地等待。
我想知道并了解如何使用 go 执行过滤并发管道 在 producer/consumer 方案中。
我已经设法编写了一个版本来检查一个值,如果没问题,就将它发送到一个频道 如果不是,则将该值发送到另一个通道。
读取和处理值后,两个goroutine负责读取处理后的值 并将它们写入文件。这个版本工作正常。但是...
假设我不想要无效值。有没有办法更改 select 语句(或消费者 goroutine),以便仅 输出正确的值(即仅使用一个输出通道)。我尝试删除那个 invalidValues 频道但是 我没有成功。
我尝试将 select 语句放在
if valid?
中;有一个分支与此版本中的完整语句和 false 分支 只需等待完成频道。通过这种方式,我可以丢弃无效值并使用一个通道,但我并没有用这种方法成功。
关于如何解决这个问题有什么想法吗?
- 此外,在这个方案中,我想知道为什么如果我省略从程序的 invalidValues 通道中删除值的 goroutine 没有完成?是否需要清空通道,否则一直处于阻塞状态?有没有更优雅的方法来做一个范围 价值观?
谢谢!!
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
这里是代码的完整版本
done := make(chan struct{})
defer close(done)
inputStream := make(chan string)
outputStream := make(chan string)
invalidValues := make(chan string)
//Producer reads a file with values and stores them in a channel
go func() {
count := 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
inputStream <- strings.TrimSpace(scanner.Text())
count = count + 1
}
close(inputStream)
}()
//Consumers
var wg sync.WaitGroup
wg.Add(Workers)
for i := 0; i < Workers; i++ {
// Deploy #Workers to read from the inputStream perform validation and output the valid results to one channel and the invalid to another
go func() {
for value := range inputStream {
var c *chan string
dataToWrite := value
if valid := checkValue(value); valid {
dataToWrite = value
c = &outputStream
} else {
c = &invalidValues
}
select {
case *c <- dataToWrite:
case <-done:
return
}
time.Sleep(time.Duration(5) * time.Second)
}
wg.Done()
}()
}
go func() {
wg.Wait()
close(outputStream)
close(invalidValues)
}()
//Write outputStream file
resultFile, err := os.Create("outputStream.txt")
if err != nil {
log.Fatal(err)
}
//Error file
errorFile, err := os.Create("errors.txt")
if err != nil {
log.Fatal(err)
}
//Create two goruotines for writing the outputStream file
var wg2 sync.WaitGroup
wg2.Add(2)
go func() {
//Write outputStream and error to files
for r := range outputStream {
_, err := resultFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
resultFile.Close()
wg2.Done()
}()
go func() {
for r := range invalidValues {
_, err := errorFile.WriteString(r + "\n")
if err != nil {
log.Fatal(err)
}
}
errorFile.Close()
wg2.Done()
}()
wg2.Wait()
删除无效频道:
for value := range inputStream {
var c *chan string
if valid := checkValue(value); valid {
select {
case outputStream <- value
case <-done:
return
}
}
}
如果删除无效值 reader goroutine,则必须将等待组更改为:
wg2.Add(1)
所以你不会无限期地等待。