在时间间隔和通道长度之间选择
Selecting between time interval and length of channel
我来这里是为了找出执行后续任务的最惯用的方法。
任务:
将数据从通道写入文件。
问题:
我有频道ch := make(chan int, 100)
我需要从通道读取并将我从通道读取的值写入文件。我的问题基本上是鉴于
我该怎么做
- 如果通道
ch
已满,立即写入值
- 如果通道
ch
未满,则每5s写入一次。
所以本质上,至少每5s需要将数据写入文件(假设至少每5s将数据填充到通道中)
使用 select
、for
和 range
完成上述任务的最佳方法是什么?
谢谢!
没有"buffer of channel is full"这样的"event",所以你无法检测到那个[*]。这意味着您无法仅使用 1 个通道以惯用的方式解决您的语言原语问题。
[*] 不完全正确:您可以通过使用 select
和 default
情况下 发送 在频道上,但这需要发件人的逻辑,并重复发送尝试。
我会使用另一个通道,当值在其上发送时,我会从该通道接收值,然后 "redirect",将值存储在另一个通道中,如您所述,该通道的缓冲区为 100。在每次重定向时,您可以检查内部通道的缓冲区是否已满,如果是,则立即写入。如果没有,继续监视 "incoming" 通道和带有 select
语句的计时器通道,如果计时器触发,则执行 "regular" 写入。
您可以使用len(chInternal)
检查chInternal
通道中有多少元素,并使用cap(chInternal)
检查其容量。请注意,这是 "safe" 因为我们是唯一处理 chInternal
通道的 goroutine。如果有多个 goroutine,由 len(chInternal)
编辑的值 return 在我们将它用于某些东西(例如比较它)时可能已经过时了。
在此解决方案中 chInternal
(如其名称所示)仅供内部使用。其他人应该只在 ch
上发送值。请注意,ch
可能是也可能不是缓冲通道,解决方案适用于这两种情况。但是,如果您也给 ch
一些缓冲区,您可能会提高效率(这样发件人被阻止的机会就会降低)。
var (
chInternal = make(chan int, 100)
ch = make(chan int) // You may (should) make this a buffered channel too
)
func main() {
delay := time.Second * 5
timer := time.NewTimer(delay)
for {
select {
case v := <-ch:
chInternal <- v
if len(chInternal) == cap(chInternal) {
doWrite() // Buffer is full, we need to write immediately
timer.Reset(delay)
}
case <-timer.C:
doWrite() // "Regular" write: 5 seconds have passed since last write
timer.Reset(delay)
}
}
}
如果发生立即写入(由于 "buffer full" 情况),此解决方案会将下一个 "regular" 写入计时在此之后 5 秒。如果您不想这样,并且希望 5 秒的常规写入独立于立即写入,则不要在立即写入后重置计时器。
doWrite()
的实现可能如下:
var f *os.File // Make sure to open file for writing
func doWrite() {
for {
select {
case v := <-chInternal:
fmt.Fprintf(f, "%d ", v) // Write v to the file
default: // Stop when no more values in chInternal
return
}
}
}
我们不能使用for ... range
,因为通道关闭时只有return秒,但我们的chInternal
通道没有关闭。所以我们使用 select
和 default
的情况,所以当 chInternal
的缓冲区中没有更多值时,我们 return.
改进
使用切片而不是第二通道
由于 chInternal
通道仅供我们使用,并且仅在单个 goroutine 上使用,我们也可以选择使用单个 []int
切片而不是通道(reading/writing切片比通道快得多)。
仅显示不同/更改的部分,它可能看起来像这样:
var (
buf = make([]int, 0, 100)
)
func main() {
// ...
for {
select {
case v := <-ch:
buf = append(buf, v)
if len(buf) == cap(buf) {
// ...
}
}
func doWrite() {
for _, v := range buf {
fmt.Fprintf(f, "%d ", v) // Write v to the file
}
buf = buf[:0] // "Clear" the buffer
}
有多个 goroutines
如果我们坚持留下 chInternal
一个频道,doWrite()
函数可能会在另一个 goroutine 上被调用而不阻塞另一个 goroutine,例如go doWrite()
。由于要写入的数据是从通道 (chInternal
) 读取的,因此不需要进一步同步。
如果只使用5秒写入,提高文件写入性能,
您可以随时填写频道,
然后 writer goroutine 将该数据写入缓冲文件,
在不使用计时器的情况下查看这个非常简单且惯用的示例
只是使用...范围:
package main
import (
"bufio"
"fmt"
"os"
"sync"
)
var wg sync.WaitGroup
func WriteToFile(filename string, ch chan int) {
f, e := os.Create(filename)
if e != nil {
panic(e)
}
w := bufio.NewWriterSize(f, 4*1024*1024)
defer wg.Done()
defer f.Close()
defer w.Flush()
for v := range ch {
fmt.Fprintf(w, "%d ", v)
}
}
func main() {
ch := make(chan int, 100)
wg.Add(1)
go WriteToFile("file.txt", ch)
for i := 0; i < 500000; i++ {
ch <- i // do the job
}
close(ch) // Finish the job and close output file
wg.Wait()
}
并注意 defer
的顺序。
如果是 5 秒写入,你可以添加一个间隔定时器来将这个文件的缓冲区刷新到磁盘,像这样:
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func WriteToFile(filename string, ch chan int) {
f, e := os.Create(filename)
if e != nil {
panic(e)
}
w := bufio.NewWriterSize(f, 4*1024*1024)
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
if w.Buffered() > 0 {
fmt.Println(w.Buffered())
w.Flush()
}
case <-quit:
ticker.Stop()
return
}
}
}()
defer wg.Done()
defer f.Close()
defer w.Flush()
defer close(quit)
for v := range ch {
fmt.Fprintf(w, "%d ", v)
}
}
func main() {
ch := make(chan int, 100)
wg.Add(1)
go WriteToFile("file.txt", ch)
for i := 0; i < 25; i++ {
ch <- i // do the job
time.Sleep(500 * time.Millisecond)
}
close(ch) // Finish the job and close output file
wg.Wait()
}
这里我使用了time.NewTicker(5 * time.Second)
作为带quit
通道的间隔计时器,你可以使用time.AfterFunc()
或time.Tick()
或time.Sleep()
。
经过一些优化(删除退出频道):
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func WriteToFile(filename string, ch chan int) {
f, e := os.Create(filename)
if e != nil {
panic(e)
}
w := bufio.NewWriterSize(f, 4*1024*1024)
ticker := time.NewTicker(5 * time.Second)
defer wg.Done()
defer f.Close()
defer w.Flush()
for {
select {
case v, ok := <-ch:
if ok {
fmt.Fprintf(w, "%d ", v)
} else {
fmt.Println("done.")
ticker.Stop()
return
}
case <-ticker.C:
if w.Buffered() > 0 {
fmt.Println(w.Buffered())
w.Flush()
}
}
}
}
func main() {
ch := make(chan int, 100)
wg.Add(1)
go WriteToFile("file.txt", ch)
for i := 0; i < 25; i++ {
ch <- i // do the job
time.Sleep(500 * time.Millisecond)
}
close(ch) // Finish the job and close output file
wg.Wait()
}
希望对您有所帮助。
我来这里是为了找出执行后续任务的最惯用的方法。
任务:
将数据从通道写入文件。
问题:
我有频道ch := make(chan int, 100)
我需要从通道读取并将我从通道读取的值写入文件。我的问题基本上是鉴于
我该怎么做- 如果通道
ch
已满,立即写入值 - 如果通道
ch
未满,则每5s写入一次。
所以本质上,至少每5s需要将数据写入文件(假设至少每5s将数据填充到通道中)
使用 select
、for
和 range
完成上述任务的最佳方法是什么?
谢谢!
没有"buffer of channel is full"这样的"event",所以你无法检测到那个[*]。这意味着您无法仅使用 1 个通道以惯用的方式解决您的语言原语问题。
[*] 不完全正确:您可以通过使用 select
和 default
情况下 发送 在频道上,但这需要发件人的逻辑,并重复发送尝试。
我会使用另一个通道,当值在其上发送时,我会从该通道接收值,然后 "redirect",将值存储在另一个通道中,如您所述,该通道的缓冲区为 100。在每次重定向时,您可以检查内部通道的缓冲区是否已满,如果是,则立即写入。如果没有,继续监视 "incoming" 通道和带有 select
语句的计时器通道,如果计时器触发,则执行 "regular" 写入。
您可以使用len(chInternal)
检查chInternal
通道中有多少元素,并使用cap(chInternal)
检查其容量。请注意,这是 "safe" 因为我们是唯一处理 chInternal
通道的 goroutine。如果有多个 goroutine,由 len(chInternal)
编辑的值 return 在我们将它用于某些东西(例如比较它)时可能已经过时了。
在此解决方案中 chInternal
(如其名称所示)仅供内部使用。其他人应该只在 ch
上发送值。请注意,ch
可能是也可能不是缓冲通道,解决方案适用于这两种情况。但是,如果您也给 ch
一些缓冲区,您可能会提高效率(这样发件人被阻止的机会就会降低)。
var (
chInternal = make(chan int, 100)
ch = make(chan int) // You may (should) make this a buffered channel too
)
func main() {
delay := time.Second * 5
timer := time.NewTimer(delay)
for {
select {
case v := <-ch:
chInternal <- v
if len(chInternal) == cap(chInternal) {
doWrite() // Buffer is full, we need to write immediately
timer.Reset(delay)
}
case <-timer.C:
doWrite() // "Regular" write: 5 seconds have passed since last write
timer.Reset(delay)
}
}
}
如果发生立即写入(由于 "buffer full" 情况),此解决方案会将下一个 "regular" 写入计时在此之后 5 秒。如果您不想这样,并且希望 5 秒的常规写入独立于立即写入,则不要在立即写入后重置计时器。
doWrite()
的实现可能如下:
var f *os.File // Make sure to open file for writing
func doWrite() {
for {
select {
case v := <-chInternal:
fmt.Fprintf(f, "%d ", v) // Write v to the file
default: // Stop when no more values in chInternal
return
}
}
}
我们不能使用for ... range
,因为通道关闭时只有return秒,但我们的chInternal
通道没有关闭。所以我们使用 select
和 default
的情况,所以当 chInternal
的缓冲区中没有更多值时,我们 return.
改进
使用切片而不是第二通道
由于 chInternal
通道仅供我们使用,并且仅在单个 goroutine 上使用,我们也可以选择使用单个 []int
切片而不是通道(reading/writing切片比通道快得多)。
仅显示不同/更改的部分,它可能看起来像这样:
var (
buf = make([]int, 0, 100)
)
func main() {
// ...
for {
select {
case v := <-ch:
buf = append(buf, v)
if len(buf) == cap(buf) {
// ...
}
}
func doWrite() {
for _, v := range buf {
fmt.Fprintf(f, "%d ", v) // Write v to the file
}
buf = buf[:0] // "Clear" the buffer
}
有多个 goroutines
如果我们坚持留下 chInternal
一个频道,doWrite()
函数可能会在另一个 goroutine 上被调用而不阻塞另一个 goroutine,例如go doWrite()
。由于要写入的数据是从通道 (chInternal
) 读取的,因此不需要进一步同步。
如果只使用5秒写入,提高文件写入性能,
您可以随时填写频道,
然后 writer goroutine 将该数据写入缓冲文件,
在不使用计时器的情况下查看这个非常简单且惯用的示例
只是使用...范围:
package main
import (
"bufio"
"fmt"
"os"
"sync"
)
var wg sync.WaitGroup
func WriteToFile(filename string, ch chan int) {
f, e := os.Create(filename)
if e != nil {
panic(e)
}
w := bufio.NewWriterSize(f, 4*1024*1024)
defer wg.Done()
defer f.Close()
defer w.Flush()
for v := range ch {
fmt.Fprintf(w, "%d ", v)
}
}
func main() {
ch := make(chan int, 100)
wg.Add(1)
go WriteToFile("file.txt", ch)
for i := 0; i < 500000; i++ {
ch <- i // do the job
}
close(ch) // Finish the job and close output file
wg.Wait()
}
并注意 defer
的顺序。
如果是 5 秒写入,你可以添加一个间隔定时器来将这个文件的缓冲区刷新到磁盘,像这样:
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func WriteToFile(filename string, ch chan int) {
f, e := os.Create(filename)
if e != nil {
panic(e)
}
w := bufio.NewWriterSize(f, 4*1024*1024)
ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
if w.Buffered() > 0 {
fmt.Println(w.Buffered())
w.Flush()
}
case <-quit:
ticker.Stop()
return
}
}
}()
defer wg.Done()
defer f.Close()
defer w.Flush()
defer close(quit)
for v := range ch {
fmt.Fprintf(w, "%d ", v)
}
}
func main() {
ch := make(chan int, 100)
wg.Add(1)
go WriteToFile("file.txt", ch)
for i := 0; i < 25; i++ {
ch <- i // do the job
time.Sleep(500 * time.Millisecond)
}
close(ch) // Finish the job and close output file
wg.Wait()
}
这里我使用了time.NewTicker(5 * time.Second)
作为带quit
通道的间隔计时器,你可以使用time.AfterFunc()
或time.Tick()
或time.Sleep()
。
经过一些优化(删除退出频道):
package main
import (
"bufio"
"fmt"
"os"
"sync"
"time"
)
var wg sync.WaitGroup
func WriteToFile(filename string, ch chan int) {
f, e := os.Create(filename)
if e != nil {
panic(e)
}
w := bufio.NewWriterSize(f, 4*1024*1024)
ticker := time.NewTicker(5 * time.Second)
defer wg.Done()
defer f.Close()
defer w.Flush()
for {
select {
case v, ok := <-ch:
if ok {
fmt.Fprintf(w, "%d ", v)
} else {
fmt.Println("done.")
ticker.Stop()
return
}
case <-ticker.C:
if w.Buffered() > 0 {
fmt.Println(w.Buffered())
w.Flush()
}
}
}
}
func main() {
ch := make(chan int, 100)
wg.Add(1)
go WriteToFile("file.txt", ch)
for i := 0; i < 25; i++ {
ch <- i // do the job
time.Sleep(500 * time.Millisecond)
}
close(ch) // Finish the job and close output file
wg.Wait()
}
希望对您有所帮助。