for 循环上的 goroutine 通道
goroutine channels over a for loop
我的 main
函数从文件中读取 json,将其解组为结构,将其转换为另一种结构类型并通过标准输出输出格式化的 JSON。
我正在尝试实现 goroutines 和通道来为我的 for
循环添加并发性。
func main() {
muvMap := map[string]string{"male": "M", "female": "F"}
fileA, err := os.Open("serviceAfileultimate.json")
if err != nil {
panic(err)
}
defer fileA.Close()
data := make([]byte, 10000)
count, err := fileA.Read(data)
if err != nil {
panic(err)
}
dataBytes := data[:count]
var servicesA ServiceA
json.Unmarshal(dataBytes, &servicesA)
var servicesB = make([]ServiceB, servicesA.Count)
goChannels := make(chan ServiceB, servicesA.Count)
for i := 0; i < servicesA.Count; i++ {
go func() {
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Address").SetString(Merge(&servicesA.Users[i].Location))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Date_Of_Birth").SetString(dateCopyTransform(servicesA.Users[i].Dob))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Email").SetString(servicesA.Users[i].Email)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Fullname").SetString(Merge(&servicesA.Users[i].Name))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Gender").SetString(muvMap[servicesA.Users[i].Gender])
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Phone").SetString(servicesA.Users[i].Cell)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Username").SetString(servicesA.Users[i].Username)
goChannels <- servicesB[i]
}()
}
for index := range goChannels {
json.NewEncoder(os.Stdout).Encode(index)
}
}
它编译但返回如下消息:
goroutine 1 [chan receive]: main.main() C://.....go.94 +0x55b.
您打印的是频道信息,而不是其中包含的数据。您不需要循环,您只想接收然后打印。
json := <-index
json.NewEncoder(os.Stdout).Encode(json)
现在我需要指出的是,该代码不会阻塞。如果您想继续阅读直到完成所有工作,您需要某种 locking/coordination 机制。
你会经常看到
for {
select {
case json := <-jsonChannel:
// do stuff
case <-abort:
// get out of here
}
}
为了解决这个问题。另外,仅供参考,您正在使用默认容量(意味着它是一个缓冲频道)初始化您的频道,这很奇怪。我建议您查看有关该主题的一些教程,因为总的来说您的设计需要一些工作实际上是对非并发实现的改进。最后,您可以找到图书馆来为您抽象其中的一些工作,大多数人可能会建议您这样做。这是一个例子; https://github.com/lytics/squaredance
我的 main
函数从文件中读取 json,将其解组为结构,将其转换为另一种结构类型并通过标准输出输出格式化的 JSON。
我正在尝试实现 goroutines 和通道来为我的 for
循环添加并发性。
func main() {
muvMap := map[string]string{"male": "M", "female": "F"}
fileA, err := os.Open("serviceAfileultimate.json")
if err != nil {
panic(err)
}
defer fileA.Close()
data := make([]byte, 10000)
count, err := fileA.Read(data)
if err != nil {
panic(err)
}
dataBytes := data[:count]
var servicesA ServiceA
json.Unmarshal(dataBytes, &servicesA)
var servicesB = make([]ServiceB, servicesA.Count)
goChannels := make(chan ServiceB, servicesA.Count)
for i := 0; i < servicesA.Count; i++ {
go func() {
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Address").SetString(Merge(&servicesA.Users[i].Location))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Date_Of_Birth").SetString(dateCopyTransform(servicesA.Users[i].Dob))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Email").SetString(servicesA.Users[i].Email)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Fullname").SetString(Merge(&servicesA.Users[i].Name))
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Gender").SetString(muvMap[servicesA.Users[i].Gender])
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Phone").SetString(servicesA.Users[i].Cell)
reflect.ValueOf(&servicesB[i]).Elem().FieldByName("Username").SetString(servicesA.Users[i].Username)
goChannels <- servicesB[i]
}()
}
for index := range goChannels {
json.NewEncoder(os.Stdout).Encode(index)
}
}
它编译但返回如下消息:
goroutine 1 [chan receive]: main.main() C://.....go.94 +0x55b.
您打印的是频道信息,而不是其中包含的数据。您不需要循环,您只想接收然后打印。
json := <-index
json.NewEncoder(os.Stdout).Encode(json)
现在我需要指出的是,该代码不会阻塞。如果您想继续阅读直到完成所有工作,您需要某种 locking/coordination 机制。
你会经常看到
for {
select {
case json := <-jsonChannel:
// do stuff
case <-abort:
// get out of here
}
}
为了解决这个问题。另外,仅供参考,您正在使用默认容量(意味着它是一个缓冲频道)初始化您的频道,这很奇怪。我建议您查看有关该主题的一些教程,因为总的来说您的设计需要一些工作实际上是对非并发实现的改进。最后,您可以找到图书馆来为您抽象其中的一些工作,大多数人可能会建议您这样做。这是一个例子; https://github.com/lytics/squaredance