在时间间隔和通道长度之间选择

Selecting between time interval and length of channel

我来这里是为了找出执行后续任务的最惯用的方法。

任务:

将数据从通道写入文件。

问题

我有频道ch := make(chan int, 100)

我需要从通道读取并将我从通道读取的值写入文件。我的问题基本上是鉴于

我该怎么做
  1. 如果通道 ch 已满,立即写入值
  2. 如果通道ch未满,则每5s写入一次。

所以本质上,至少每5s需要将数据写入文件(假设至少每5s将数据填充到通道中)

使用 selectforrange 完成上述任务的最佳方法是什么?

谢谢!

没有"buffer of channel is full"这样的"event",所以你无法检测到那个[*]。这意味着您无法仅使用 1 个通道以惯用的方式解决您的语言原语问题。

[*] 不完全正确:您可以通过使用 selectdefault 情况下 发送 在频道上,但这需要发件人的逻辑,并重复发送尝试。

我会使用另一个通道,当值在其上发送时,我会从该通道接收值,然后 "redirect",将值存储在另一个通道中,如您所述,该通道的缓冲区为 100。在每次重定向时,您可以检查内部通道的缓冲区是否已满,如果是,则立即写入。如果没有,继续监视 "incoming" 通道和带有 select 语句的计时器通道,如果计时器触发,则执行 "regular" 写入。

您可以使用len(chInternal)检查chInternal通道中有多少元素,并使用cap(chInternal)检查其容量。请注意,这是 "safe" 因为我们是唯一处理 chInternal 通道的 goroutine。如果有多个 goroutine,由 len(chInternal) 编辑的值 return 在我们将它用于某些东西(例如比较它)时可能已经过时了。

在此解决方案中 chInternal(如其名称所示)仅供内部使用。其他人应该只在 ch 上发送值。请注意,ch 可能是也可能不是缓冲通道,解决方案适用于这两种情况。但是,如果您也给 ch 一些缓冲区,您可能会提高效率(这样发件人被阻止的机会就会降低)。

var (
    chInternal = make(chan int, 100)
    ch         = make(chan int) // You may (should) make this a buffered channel too
)

func main() {
    delay := time.Second * 5
    timer := time.NewTimer(delay)
    for {
        select {
        case v := <-ch:
            chInternal <- v
            if len(chInternal) == cap(chInternal) {
                doWrite() // Buffer is full, we need to write immediately
                timer.Reset(delay)
            }
        case <-timer.C:
            doWrite() // "Regular" write: 5 seconds have passed since last write
            timer.Reset(delay)
        }
    }
}

如果发生立即写入(由于 "buffer full" 情况),此解决方案会将下一个 "regular" 写入计时在此之后 5 秒。如果您不想这样,并且希望 5 秒的常规写入独立于立即写入,则不要在立即写入后重置计时器。

doWrite()的实现可能如下:

var f *os.File // Make sure to open file for writing

func doWrite() {
    for {
        select {
        case v := <-chInternal:
            fmt.Fprintf(f, "%d ", v) // Write v to the file
        default: // Stop when no more values in chInternal
            return
        }
    }
}

我们不能使用for ... range,因为通道关闭时只有return秒,但我们的chInternal通道没有关闭。所以我们使用 selectdefault 的情况,所以当 chInternal 的缓冲区中没有更多值时,我们 return.

改进

使用切片而不是第二通道

由于 chInternal 通道仅供我们使用,并且仅在单个 goroutine 上使用,我们也可以选择使用单个 []int 切片而不是通道(reading/writing切片比通道快得多)。

仅显示不同/更改的部分,它可能看起来像这样:

var (
    buf = make([]int, 0, 100)
)

func main() {
    // ...

    for {
        select {
        case v := <-ch:
            buf = append(buf, v)
            if len(buf) == cap(buf) {
            // ...
    }
}

func doWrite() {
    for _, v := range buf {
        fmt.Fprintf(f, "%d ", v) // Write v to the file
    }
    buf = buf[:0] // "Clear" the buffer
}

有多个 goroutines

如果我们坚持留下 chInternal 一个频道,doWrite() 函数可能会在另一个 goroutine 上被调用而不阻塞另一个 goroutine,例如go doWrite()。由于要写入的数据是从通道 (chInternal) 读取的,因此不需要进一步同步。

如果只使用5秒写入,提高文件写入性能,
您可以随时填写频道,
然后 writer goroutine 将该数据写入缓冲文件,
在不使用计时器的情况下查看这个非常简单且惯用的示例
只是使用...范围

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
)

var wg sync.WaitGroup

func WriteToFile(filename string, ch chan int) {
    f, e := os.Create(filename)
    if e != nil {
        panic(e)
    }
    w := bufio.NewWriterSize(f, 4*1024*1024)
    defer wg.Done()
    defer f.Close()
    defer w.Flush()
    for v := range ch {
        fmt.Fprintf(w, "%d ", v)
    }
}

func main() {
    ch := make(chan int, 100)
    wg.Add(1)
    go WriteToFile("file.txt", ch)

    for i := 0; i < 500000; i++ {
        ch <- i // do the job
    }
    close(ch) // Finish the job and close output file
    wg.Wait()
}

并注意 defer 的顺序。

如果是 5 秒写入,你可以添加一个间隔定时器来将这个文件的缓冲区刷新到磁盘,像这样:

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup

func WriteToFile(filename string, ch chan int) {
    f, e := os.Create(filename)
    if e != nil {
        panic(e)
    }
    w := bufio.NewWriterSize(f, 4*1024*1024)

    ticker := time.NewTicker(5 * time.Second)
    quit := make(chan struct{})
    go func() {
        for {
            select {
            case <-ticker.C:
                if w.Buffered() > 0 {
                    fmt.Println(w.Buffered())
                    w.Flush()
                }
            case <-quit:
                ticker.Stop()
                return
            }
        }
    }()

    defer wg.Done()
    defer f.Close()
    defer w.Flush()
    defer close(quit)
    for v := range ch {
        fmt.Fprintf(w, "%d ", v)
    }
}

func main() {
    ch := make(chan int, 100)
    wg.Add(1)
    go WriteToFile("file.txt", ch)

    for i := 0; i < 25; i++ {
        ch <- i // do the job
        time.Sleep(500 * time.Millisecond)
    }
    close(ch) // Finish the job and close output file
    wg.Wait()
}

这里我使用了time.NewTicker(5 * time.Second)作为带quit通道的间隔计时器,你可以使用time.AfterFunc()time.Tick()time.Sleep()

经过一些优化(删除退出频道):

package main

import (
    "bufio"
    "fmt"
    "os"
    "sync"
    "time"
)

var wg sync.WaitGroup

func WriteToFile(filename string, ch chan int) {
    f, e := os.Create(filename)
    if e != nil {
        panic(e)
    }
    w := bufio.NewWriterSize(f, 4*1024*1024)
    ticker := time.NewTicker(5 * time.Second)
    defer wg.Done()
    defer f.Close()
    defer w.Flush()

    for {
        select {
        case v, ok := <-ch:
            if ok {
                fmt.Fprintf(w, "%d ", v)
            } else {
                fmt.Println("done.")
                ticker.Stop()
                return
            }
        case <-ticker.C:
            if w.Buffered() > 0 {
                fmt.Println(w.Buffered())
                w.Flush()
            }
        }
    }
}
func main() {
    ch := make(chan int, 100)
    wg.Add(1)
    go WriteToFile("file.txt", ch)

    for i := 0; i < 25; i++ {
        ch <- i // do the job
        time.Sleep(500 * time.Millisecond)
    }
    close(ch) // Finish the job and close output file
    wg.Wait()
}

希望对您有所帮助。