如何在 Go 中同时写入两个不同的 csv 文件?
How to write to two different csv files concurrently in Go?
我创建了一个最小的可重现示例
package main
import (
"encoding/csv"
"fmt"
"os"
"strconv"
"sync/atomic"
"time"
)
var (
csvOnePath = "test.csv"
csvTwoPath = "test_two.csv"
)
type A struct {
Running int32 // used atomically
QuitChan chan struct{}
}
func NewA() *A {
return &A{
QuitChan: make(chan struct{}),
}
}
func (a *A) Start() error {
if ok := atomic.CompareAndSwapInt32(&a.Running, 0, 1); !ok {
return fmt.Errorf("Cannot start service A: service already started")
}
go a.record()
return nil
}
func (a *A) Stop() error {
if ok := atomic.CompareAndSwapInt32(&a.Running, 1, 0); !ok {
return fmt.Errorf("Cannot stop service A: service already stopped")
}
close(a.QuitChan)
return nil
}
func (a *A) record() {
//file_one, err := os.OpenFile(csvOnePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
file_one, err := os.Create(csvOnePath)
if err != nil {
fmt.Println(err)
return
}
writer := csv.NewWriter(file_one)
// writer, closeFileFunc, err := NewCsvWriter(csvOnePath)
if err != nil {
fmt.Println(err)
return
}
header := []string{"this", "is", "a", "test"}
err = writer.Write(header)
if err != nil {
fmt.Println(err)
return
}
ticker := time.NewTicker(10*time.Second)
for {
select {
case t := <-ticker.C:
err = writer.Write([]string{fmt.Sprintf("%2d:%2d:%2d", t.Hour(), t.Minute(), t.Second())})
if err != nil {
fmt.Println(err)
a.QuitChan <- struct{}{}
}
case <-a.QuitChan:
ticker.Stop()
writer.Flush()
file_one.Close()
fmt.Println("Stopped recording.")
break
}
}
}
type B struct {
Running int32 // used atomically
QuitChan chan struct{}
}
func NewB() *B {
return &B{
QuitChan: make(chan struct{}),
}
}
func (b *B) Start() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
return fmt.Errorf("Cannot start service B: service already started")
}
go b.record()
return nil
}
func (b *B) Stop() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
return fmt.Errorf("Cannot stop service B: service already stopped")
}
close(b.QuitChan)
return nil
}
func writeMsgToReport(report *csv.Writer, msg string) error {
ct := time.Now()
timestamp := fmt.Sprintf("%2d:%2d:%2d", ct.Hour(), ct.Minute(), ct.Second())
return report.Write([]string{timestamp, msg})
}
func (b *B) record() {
//file_two, err := os.OpenFile(csvTwoPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
file_two, err := os.Create(csvTwoPath)
if err != nil {
fmt.Println(err)
return
}
writer := csv.NewWriter(file_two)
//writer, closeFileFunc, err := NewCsvWriter(csvTwoPath)
if err != nil {
fmt.Println(err)
return
}
header := []string{"this", "is", "a", "second", "test"}
err = writer.Write(header)
if err != nil {
fmt.Println(err)
return
}
ticker := time.NewTicker(1*time.Second)
ticks := 0
for {
select {
case <-ticker.C:
if ticks % 15 == 0 {
err = writeMsgToReport(writer, "YEET "+strconv.Itoa(ticks))
if err != nil {
fmt.Println(err)
b.QuitChan <- struct{}{}
}
}
ticks++
case <-b.QuitChan:
ticker.Stop()
writer.Flush()
file_two.Close()
fmt.Println("Stopped recording.")
break
}
}
}
func main() {
serviceA := NewA()
err := serviceA.Start()
if err != nil {
fmt.Println(err)
return
}
defer serviceA.Stop()
serviceB := NewB()
err = serviceB.Start()
if err != nil {
fmt.Println(err)
return
}
defer serviceB.Stop()
time.Sleep(600*time.Second)
}
本质上,我有两个不同的服务,在两个不同的 goroutine 中 运行 一个 record
方法。他们各自在不同的时间创建和写入不同的 csv 文件。当我 运行 这样做时,会创建 csv 文件但没有数据。 运行 执行此操作时不会出现任何错误。我读到我应该使用我已经实现的互斥体,但这也没有用。我应该在这里做什么?
如注释中所述,程序将在 main()
完成时退出; the spec 声明“它不等待其他(非主)goroutines 完成。”。
这意味着您的 go 例程不太可能处理关闭文件的代码,这意味着可能不会写入缓冲数据。
我在 the playground 中创建了您的应用程序的简化版本来演示这一点。
有很多方法可以解决这个问题,但最简单的方法可能是添加一个 WaitGroup
这样您的应用程序就可以在终止之前等待 go 例程退出。
我创建了一个最小的可重现示例
package main
import (
"encoding/csv"
"fmt"
"os"
"strconv"
"sync/atomic"
"time"
)
var (
csvOnePath = "test.csv"
csvTwoPath = "test_two.csv"
)
type A struct {
Running int32 // used atomically
QuitChan chan struct{}
}
func NewA() *A {
return &A{
QuitChan: make(chan struct{}),
}
}
func (a *A) Start() error {
if ok := atomic.CompareAndSwapInt32(&a.Running, 0, 1); !ok {
return fmt.Errorf("Cannot start service A: service already started")
}
go a.record()
return nil
}
func (a *A) Stop() error {
if ok := atomic.CompareAndSwapInt32(&a.Running, 1, 0); !ok {
return fmt.Errorf("Cannot stop service A: service already stopped")
}
close(a.QuitChan)
return nil
}
func (a *A) record() {
//file_one, err := os.OpenFile(csvOnePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755)
file_one, err := os.Create(csvOnePath)
if err != nil {
fmt.Println(err)
return
}
writer := csv.NewWriter(file_one)
// writer, closeFileFunc, err := NewCsvWriter(csvOnePath)
if err != nil {
fmt.Println(err)
return
}
header := []string{"this", "is", "a", "test"}
err = writer.Write(header)
if err != nil {
fmt.Println(err)
return
}
ticker := time.NewTicker(10*time.Second)
for {
select {
case t := <-ticker.C:
err = writer.Write([]string{fmt.Sprintf("%2d:%2d:%2d", t.Hour(), t.Minute(), t.Second())})
if err != nil {
fmt.Println(err)
a.QuitChan <- struct{}{}
}
case <-a.QuitChan:
ticker.Stop()
writer.Flush()
file_one.Close()
fmt.Println("Stopped recording.")
break
}
}
}
type B struct {
Running int32 // used atomically
QuitChan chan struct{}
}
func NewB() *B {
return &B{
QuitChan: make(chan struct{}),
}
}
func (b *B) Start() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 0, 1); !ok {
return fmt.Errorf("Cannot start service B: service already started")
}
go b.record()
return nil
}
func (b *B) Stop() error {
if ok := atomic.CompareAndSwapInt32(&b.Running, 1, 0); !ok {
return fmt.Errorf("Cannot stop service B: service already stopped")
}
close(b.QuitChan)
return nil
}
func writeMsgToReport(report *csv.Writer, msg string) error {
ct := time.Now()
timestamp := fmt.Sprintf("%2d:%2d:%2d", ct.Hour(), ct.Minute(), ct.Second())
return report.Write([]string{timestamp, msg})
}
func (b *B) record() {
//file_two, err := os.OpenFile(csvTwoPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
file_two, err := os.Create(csvTwoPath)
if err != nil {
fmt.Println(err)
return
}
writer := csv.NewWriter(file_two)
//writer, closeFileFunc, err := NewCsvWriter(csvTwoPath)
if err != nil {
fmt.Println(err)
return
}
header := []string{"this", "is", "a", "second", "test"}
err = writer.Write(header)
if err != nil {
fmt.Println(err)
return
}
ticker := time.NewTicker(1*time.Second)
ticks := 0
for {
select {
case <-ticker.C:
if ticks % 15 == 0 {
err = writeMsgToReport(writer, "YEET "+strconv.Itoa(ticks))
if err != nil {
fmt.Println(err)
b.QuitChan <- struct{}{}
}
}
ticks++
case <-b.QuitChan:
ticker.Stop()
writer.Flush()
file_two.Close()
fmt.Println("Stopped recording.")
break
}
}
}
func main() {
serviceA := NewA()
err := serviceA.Start()
if err != nil {
fmt.Println(err)
return
}
defer serviceA.Stop()
serviceB := NewB()
err = serviceB.Start()
if err != nil {
fmt.Println(err)
return
}
defer serviceB.Stop()
time.Sleep(600*time.Second)
}
本质上,我有两个不同的服务,在两个不同的 goroutine 中 运行 一个 record
方法。他们各自在不同的时间创建和写入不同的 csv 文件。当我 运行 这样做时,会创建 csv 文件但没有数据。 运行 执行此操作时不会出现任何错误。我读到我应该使用我已经实现的互斥体,但这也没有用。我应该在这里做什么?
如注释中所述,程序将在 main()
完成时退出; the spec 声明“它不等待其他(非主)goroutines 完成。”。
这意味着您的 go 例程不太可能处理关闭文件的代码,这意味着可能不会写入缓冲数据。
我在 the playground 中创建了您的应用程序的简化版本来演示这一点。
有很多方法可以解决这个问题,但最简单的方法可能是添加一个 WaitGroup
这样您的应用程序就可以在终止之前等待 go 例程退出。