GRPC 流关闭连接

GRPC stream close connect

我正在使用 grpcgolang 上编写服务器句柄数据流。收到请求后,我应该把这个流放到 Chan,然后是 goroutine 处理此请求并发回。但是当我在 goroutine 中回信给客户时,我得到了一个 rpc error: code = Unavailable desc = transport is closing。所以我想知道我是否可以将stream传递给Channel,这个操作会关闭连接吗?

这是在协议缓冲区中识别

service AsrService {
     rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}

这里是使用grpc自动生成

type AsrService_RecognizeServer interface {
    Send(*RecognizeResponse) error
    Recv() (*RecognizeRequest, error)
    grpc.ServerStream
}

这里是给 Chan 的流

func (s *ScheduleServer) Recognize(stream 
AsrService_RecognizeServer) error {
    req, err := stream.Recv() // I can use Recv here
    if err == io.EOF || err != nil {
        // do something
    }
    var asrRequest ASRRequest
    asrRequest.stream = &stream //pass stream to Chan
    ASRRequestChan <- &asrRequest

    return nil
}

这里有一个 goroutine 来处理 Chan

type ASRRequest struct {
    stream AsrService_RecognizeServer
}

var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
    for {
        select {
            case r := <- ClientRequestChan:
                Log.Infof("Chan get request info[%v]", r)
                var rsp RecognizeResponse
                rsp.Code = **
                streamInter := *r.stream
                err = streamInter.Send(&rsp) // I can use Send here
                if err != nil {
                    fmt.Printf("Grpc write failed,err[%v]", err)
                }
                fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
        }
    }    
}

然后我收到错误 rpc error: code = Unavailable desc = transport is closing,所以流在传递给 Chan 后是否关闭?因为如果我不使用Chan,它可以成功将结果发送给客户端。

我更改了策略并使用 sync.WaitGroup 来确保 main goroutinestream 发回之前不会 return。我将构建一个 goroutine 来处理这个 stream,而 main goroutinechild goroutine 完成之前不会 return。所以连接不会关闭。

var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
    wg.Add(1)
    go s.Recognize_Syn(&wg, stream)

    wg.Wait()
    return nil
}

func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
    defer wg.Done()
    //do something
    err = stream.Send(&rsp)
    return nil
}