GRPC 流关闭连接
GRPC stream close connect
我正在使用 grpc
在 golang
上编写服务器句柄数据流。收到请求后,我应该把这个流放到 Chan
,然后是
goroutine
处理此请求并发回。但是当我在 goroutine
中回信给客户时,我得到了一个 rpc error: code = Unavailable desc = transport is closing
。所以我想知道我是否可以将stream
传递给Channel
,这个操作会关闭连接吗?
这是在协议缓冲区中识别
service AsrService {
rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}
这里是使用grpc自动生成
type AsrService_RecognizeServer interface {
Send(*RecognizeResponse) error
Recv() (*RecognizeRequest, error)
grpc.ServerStream
}
这里是给 Chan 的流
func (s *ScheduleServer) Recognize(stream
AsrService_RecognizeServer) error {
req, err := stream.Recv() // I can use Recv here
if err == io.EOF || err != nil {
// do something
}
var asrRequest ASRRequest
asrRequest.stream = &stream //pass stream to Chan
ASRRequestChan <- &asrRequest
return nil
}
这里有一个 goroutine 来处理 Chan
type ASRRequest struct {
stream AsrService_RecognizeServer
}
var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
for {
select {
case r := <- ClientRequestChan:
Log.Infof("Chan get request info[%v]", r)
var rsp RecognizeResponse
rsp.Code = **
streamInter := *r.stream
err = streamInter.Send(&rsp) // I can use Send here
if err != nil {
fmt.Printf("Grpc write failed,err[%v]", err)
}
fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
}
}
}
然后我收到错误 rpc error: code = Unavailable desc = transport is closing,所以流在传递给 Chan 后是否关闭?因为如果我不使用Chan,它可以成功将结果发送给客户端。
我更改了策略并使用 sync.WaitGroup
来确保 main goroutine
在 stream
发回之前不会 return。我将构建一个 goroutine
来处理这个 stream
,而 main goroutine
在 child goroutine
完成之前不会 return。所以连接不会关闭。
var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
wg.Add(1)
go s.Recognize_Syn(&wg, stream)
wg.Wait()
return nil
}
func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
defer wg.Done()
//do something
err = stream.Send(&rsp)
return nil
}
我正在使用 grpc
在 golang
上编写服务器句柄数据流。收到请求后,我应该把这个流放到 Chan
,然后是
goroutine
处理此请求并发回。但是当我在 goroutine
中回信给客户时,我得到了一个 rpc error: code = Unavailable desc = transport is closing
。所以我想知道我是否可以将stream
传递给Channel
,这个操作会关闭连接吗?
这是在协议缓冲区中识别
service AsrService {
rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}
这里是使用grpc自动生成
type AsrService_RecognizeServer interface {
Send(*RecognizeResponse) error
Recv() (*RecognizeRequest, error)
grpc.ServerStream
}
这里是给 Chan 的流
func (s *ScheduleServer) Recognize(stream
AsrService_RecognizeServer) error {
req, err := stream.Recv() // I can use Recv here
if err == io.EOF || err != nil {
// do something
}
var asrRequest ASRRequest
asrRequest.stream = &stream //pass stream to Chan
ASRRequestChan <- &asrRequest
return nil
}
这里有一个 goroutine 来处理 Chan
type ASRRequest struct {
stream AsrService_RecognizeServer
}
var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
for {
select {
case r := <- ClientRequestChan:
Log.Infof("Chan get request info[%v]", r)
var rsp RecognizeResponse
rsp.Code = **
streamInter := *r.stream
err = streamInter.Send(&rsp) // I can use Send here
if err != nil {
fmt.Printf("Grpc write failed,err[%v]", err)
}
fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
}
}
}
然后我收到错误 rpc error: code = Unavailable desc = transport is closing,所以流在传递给 Chan 后是否关闭?因为如果我不使用Chan,它可以成功将结果发送给客户端。
我更改了策略并使用 sync.WaitGroup
来确保 main goroutine
在 stream
发回之前不会 return。我将构建一个 goroutine
来处理这个 stream
,而 main goroutine
在 child goroutine
完成之前不会 return。所以连接不会关闭。
var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
wg.Add(1)
go s.Recognize_Syn(&wg, stream)
wg.Wait()
return nil
}
func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
defer wg.Done()
//do something
err = stream.Send(&rsp)
return nil
}