如何使用 goroutines 接收数据并发回信号

How to receive data and send back signal using goroutines

我想在 Go 中使用 并发性 将数据发送到 goroutine 进行处理和计算 使用通道 。数据点一个接一个地出现在一个函数中,这个函数可以是 main 函数,也可以是某个 sendData 函数。 如果可能,我希望从主函数发送数据。

我想将数据从发送函数发送到一个 goroutine,其中数据存储在一个切片中(我们称这个 goroutine getData)。某些计算是在这个切片上完成的。 在达到某个条件(取决于切片)后,我希望 goroutine 向 sendData 函数发出信号,表明对某批数据点的处理已完成。 并且现在,sendData 函数不断通过通道将数据点发送到 getData goroutine,在 goroutine 中不断构建新的切片,当达到条件时发送信号 - 处理完成并且整个序列保持重复。

举个例子,假设数字形式的数据从 sendData 发送到 getData。条件是 getData 收到的数字的 运行 均值应等于 4。让我们将以下数字序列作为我们的数据 - []int{3, 2, 3, 8, 2, 1, 1, 1, 15}。这里,第一批数字会是{3, 2, 3, 8},因为这些数字按这个顺序发送到getData后,发现数字8之后的运行均值等于4 getData 收到。然后它向 sendData 发送信号。并且发送数据的过程再次开始按数字顺序进行,下一批为{2, 1, 1, 1, 15}。这里,在 getData 收到数字 15 后,它发现 运行 均值等于 4,再次向 sendData 发送信号。 ( 这是一个非常基本的示例 - 在我的实际用例中,输入数据和条件更复杂。我有数据将在 sendData 中实时读取。这里读取每个数据点顺序但每个数据点在前一个数据点之后几微秒到达。因此数据的到达速度很快,我不想在这个函数中做太多的处理和计算。此外,我想保持并发完好无损,因为在读取数据的函数中,数据到达的速度很快。而且,我不希望因为在完成处理的 goroutine 中处理数据而错过这里的数据读取。)

以下是我尝试构建代码的方式:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go sendData(ch)
    go getData(ch)
}

func sendData(ch chan int) {
    syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
    for _, data := range syntheticData {
            ch <- data
        }
}

func getData(ch chan int) {
        dataArr := []int{}
        dataArr = append( dataArr, <-ch )
        fmt.Println(dataArr)

        if mean(dataArr) == 4{
          close(ch)
        }

}

func sum(array []int) int {
    var result int = 0
    for _, v := range array {
        result += v
    }

    return result
}


func mean(array []int) float64 {
    sumArr := float64(sum(array)) / float64(len(array))
    return sumArr
}

我没有用上面的代码实现我想要的功能。 如何在 Go 中实现所需的功能?

如果我对你的问题的理解正确,这应该可以正常工作。但是,这不是一个好的解决方案。我已经注释掉了代码以便于理解。

package main

import "fmt"

func main() {
    // Data channel
    ch := make(chan int)
    // Batch close/create channel for signalling
    bsig := make(chan struct{})

    go put(ch, bsig)
    get(ch, bsig, 4)
}

func put(ch chan<- int, bsig chan struct{}) {
    // Listen for batch creation
    go func() {
        n := 1
        // Receives signal from get on the bidrectional
        // channel, and ping back when done on the same
        // channel.
        for range bsig {
            fmt.Printf("Batch #%d\n", n)
            bsig <- struct{}{}
            n++
        }
    }()

    // Steam of numbers
    stream := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
    for _, s := range stream {
        ch <- s
    }
    close(ch)
}

func get(ch <-chan int, bsig chan struct{}, mean int) {
    num := 0
    sum := 0
    batch := make([]int, 0)
    for s := range ch {
        sum += s
        num++
        batch = append(batch, s)
        // Matches the mean, close batch
        // and signal the sender
        if sum/num == mean {
            // Send ping
            bsig <- struct{}{}
            // Wait for pong?
            <-bsig
            // Print the batch
            fmt.Println(batch)
            // Reset the batch
            sum = 0
            num = 0
            batch = batch[:0]
        }
    }
    // Close the channel
    close(bsig)
}

你只需要 一个 extera 接收 goroutine 例如getData 然后 main goroutine 将在数据到达时使用名为 ch 的通道发送数据,并且您需要一个 buffered 通道来发送信号,例如batchCompleted,还有一个 WaitGroup 等待 getData 同步 ,当它 完成 时。
就这些了,试试it

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int)
    batchCompleted := make(chan struct{}, 1) // non-blocking signaling channel
    wg.Add(1)
    go getData(ch, batchCompleted, wg)

    syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
    i := 0
    check := func() {
        select {
        case <-batchCompleted:
            i++
            fmt.Println(i, " batch completed")
        default:
        }
    }
    for _, data := range syntheticData {
        ch <- data
        check()
    }
    close(ch)
    wg.Wait()
    check()
}

func getData(ch chan int, batchCompleted chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    a := []int{}
    sum, n := 0, 0
    for v := range ch {
        sum += v
        n++
        a = append(a, v)
        if sum == 4*n {
            batchCompleted <- struct{}{}
            fmt.Println(a)
            sum, n = 0, 0
            a = a[:0]
        }
    }
    if len(a) > 0 {
        fmt.Println("remaining data:", a)
    }
}

输出:

[3 2 3 8]
1  batch completed
[2 1 1 1 15]
2  batch completed