如何在 golang 中同时将响应流回客户端?
How to concurrently stream response back to the client in golang?
我在服务器端的方法中使用以下代码将流式响应发送回客户端。
var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
continue
}
resources := us.helperCom.GenerateResourceString(pd)
val, _ := us.GenerateInfo(clientId, resources, cfg)
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
我的疑惑是我们可以同时将其变为 运行 吗?还是总是以单线程方式 运行 进行流式传输?
您可以使工作并发,但不能使流上的发送并发。来自 grpc-go
docs:
When using streams, one must take care to avoid calling either SendMsg
or RecvMsg
multiple times against the same Stream from different goroutines. [...]
因此,您可以 运行 在单独的 goroutine 中并发代码,并在公共通道上发送输出值。然后主流处理程序遍历此通道并依次调用 stream.Send
— 因此请记住,只有当网络响应时间少于获取数据时,所有这一切都是值得的。
代码如下所示:
// buffered, so the workers can send and exit immediately
out := make(chan <type of val>, len(data))
// you will have to close the out chan when all work is done
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
wg.Wait()
close(out)
}()
for _, cid := range data {
// don't close around the loop variable
go func (id int64) {
defer wg.Done()
val, err := // obtain output value somehow
if err != nil {
return
}
out <- val
}(cid)
}
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
goroutines的数量是data
中元素的数量。如果要控制goroutine的数量,batchdata
。如果这样做,请相应地调整通道缓冲区。
我在服务器端的方法中使用以下代码将流式响应发送回客户端。
var cfg = "..." // comes from request parameters
var clientId = "...." // comes from request parameters
var data = allCids.([]int64)
// can we run this in parallel with X number of workers?
for _, cid := range data {
pd := repo.GetCustomerData(strconv.FormatInt(cid, 10))
if !pd.IsCorrect {
continue
}
resources := us.helperCom.GenerateResourceString(pd)
val, _ := us.GenerateInfo(clientId, resources, cfg)
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
我的疑惑是我们可以同时将其变为 运行 吗?还是总是以单线程方式 运行 进行流式传输?
您可以使工作并发,但不能使流上的发送并发。来自 grpc-go
docs:
When using streams, one must take care to avoid calling either
SendMsg
orRecvMsg
multiple times against the same Stream from different goroutines. [...]
因此,您可以 运行 在单独的 goroutine 中并发代码,并在公共通道上发送输出值。然后主流处理程序遍历此通道并依次调用 stream.Send
— 因此请记住,只有当网络响应时间少于获取数据时,所有这一切都是值得的。
代码如下所示:
// buffered, so the workers can send and exit immediately
out := make(chan <type of val>, len(data))
// you will have to close the out chan when all work is done
wg := &sync.WaitGroup{}
wg.Add(len(data))
go func() {
wg.Wait()
close(out)
}()
for _, cid := range data {
// don't close around the loop variable
go func (id int64) {
defer wg.Done()
val, err := // obtain output value somehow
if err != nil {
return
}
out <- val
}(cid)
}
for val := range out {
if err := stream.Send(val); err != nil {
log.Printf("send error %v", err)
}
}
goroutines的数量是data
中元素的数量。如果要控制goroutine的数量,batchdata
。如果这样做,请相应地调整通道缓冲区。