基于 Go 的 grpc server stream 不断堆叠响应给 go client
Go based grpc server stream keeps stacking up the response to go client
抱歉,如果这是一个菜鸟问题,我是 grpc 服务器端流媒体的新手。
我现在在服务器上流式传输到客户端的功能中所拥有的
req, err := http.NewRequest("GET", actualURL, nil)
//跳过一些行//
res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
// 跳过一些行//
// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)
// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total rows to be send to the client", len(currArrData))
// 遍历响应并将其流式传输到客户端
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
// skipping some lines //
}
// return 完成后为 nil
return 无
}
我面临的问题
在客户端,我有接收此响应的功能,休眠 n 分钟并再次请求响应。
客户端确实在第一次调用时得到了预期的响应,但是对于每个后续调用都会发生一些奇怪的事情,这是我当前的问题,我尝试以下面的每个后续调用的形式进行说明:
从客户端到服务器调用 1 --> 服务器 returns 200 行
客户端休眠n分钟
从客户端到服务器调用 2 --> 服务器 returns 400 行!! (基本上每一行都是200+200的两倍)
客户端休眠n分钟
从客户端到服务器调用 3 --> 服务器 returns 600 行!! (200+200+200)
总结
我确实在客户端检查错误 == io.EOF,现在停止从服务器到客户端的响应堆积的唯一方法是停止服务器并重新启动。
我不确定我在这里遗漏了什么以确保我只发送我从 GET 请求收到的实际和准确的响应。非常感谢对此的任何提示。
更多信息
proto 文件中 gRPC protobuffer def 的一部分
rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
};
上述gRPC服务器端实现的完整代码
func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {
cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file
if url.ArrivalURL == "" {
actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint
} else {
actualURL = url.ArrivalURL
}
// build new request to get arrival data
req, err := http.NewRequest("GET", actualURL, nil)
if err != nil {
log.Fatalln("Recheck URL or connectivity, failed to make REST call")
}
req.Header.Add("Cache-Control", "no-cache")
req.Header.Add("Accept", "text/plain")
req.Header.Add("Connection", "keep-alive")
req.Header.Add("app_id", cData.AppID)
req.Header.Add("app_key", cData.AppKey)
req.Header.Add("Content-Type", "application/xml")
res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatalln("Failed to get any response")
return err
}
// unmarshaling the xml data
xmlDataErr := xml.Unmarshal(body, &flightArrData)
if xmlDataErr != nil {
log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
return xmlDataErr
}
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total arrivals in Finland", len(currArrData))
log.Println("Starting FlightDeparturesResponse for client")
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
if senderr != nil {
log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
return senderr
}
}
currArrData = nil
log.Println("Attempting to empty the arrival data")
return nil
}
用于检查上述 gRPC 实现的测试用例
我刚刚启动了一个测试 grpc 服务器,并在这个测试用例中调用了那个 grpc。客户端上的实现与接收数据相同。
func TestGetFlightArrivals(t *testing.T) {
const addr = "localhost:50051"
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Did not connect: #{err}")
}
defer conn.Close()
f := pb.NewFlightClient(conn)
t.Run("GetFlightArrivals", func(t *testing.T) {
var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
if err != nil {
t.Error("Failed to make the REST call", err)
}
for {
msg, err := res.Recv()
if err == io.EOF {
t.Log("Finished reading all the message")
break
}
if err != nil {
t.Error("Failed to receive response")
}
t.Log("Message from the server", msg.GetArrivalResponse())
}
})
}
根据@rubens21 的评论,我添加了以下几行
var emptyArrData ArrFlights
这基本上是完整的 XML 结构并将此 emptyArrData 分配给结构 flightArrData = emptyArrData
抱歉,如果这是一个菜鸟问题,我是 grpc 服务器端流媒体的新手。
我现在在服务器上流式传输到客户端的功能中所拥有的
req, err := http.NewRequest("GET", actualURL, nil)
//跳过一些行// res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
// 跳过一些行//
// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)
// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total rows to be send to the client", len(currArrData))
// 遍历响应并将其流式传输到客户端
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
// skipping some lines //
}
// return 完成后为 nil return 无 }
我面临的问题 在客户端,我有接收此响应的功能,休眠 n 分钟并再次请求响应。
客户端确实在第一次调用时得到了预期的响应,但是对于每个后续调用都会发生一些奇怪的事情,这是我当前的问题,我尝试以下面的每个后续调用的形式进行说明:
从客户端到服务器调用 1 --> 服务器 returns 200 行
客户端休眠n分钟
从客户端到服务器调用 2 --> 服务器 returns 400 行!! (基本上每一行都是200+200的两倍)
客户端休眠n分钟
从客户端到服务器调用 3 --> 服务器 returns 600 行!! (200+200+200)
总结
我确实在客户端检查错误 == io.EOF,现在停止从服务器到客户端的响应堆积的唯一方法是停止服务器并重新启动。
我不确定我在这里遗漏了什么以确保我只发送我从 GET 请求收到的实际和准确的响应。非常感谢对此的任何提示。
更多信息
proto 文件中 gRPC protobuffer def 的一部分
rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
};
上述gRPC服务器端实现的完整代码
func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {
cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file
if url.ArrivalURL == "" {
actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint
} else {
actualURL = url.ArrivalURL
}
// build new request to get arrival data
req, err := http.NewRequest("GET", actualURL, nil)
if err != nil {
log.Fatalln("Recheck URL or connectivity, failed to make REST call")
}
req.Header.Add("Cache-Control", "no-cache")
req.Header.Add("Accept", "text/plain")
req.Header.Add("Connection", "keep-alive")
req.Header.Add("app_id", cData.AppID)
req.Header.Add("app_key", cData.AppKey)
req.Header.Add("Content-Type", "application/xml")
res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatalln("Failed to get any response")
return err
}
// unmarshaling the xml data
xmlDataErr := xml.Unmarshal(body, &flightArrData)
if xmlDataErr != nil {
log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
return xmlDataErr
}
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total arrivals in Finland", len(currArrData))
log.Println("Starting FlightDeparturesResponse for client")
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
if senderr != nil {
log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
return senderr
}
}
currArrData = nil
log.Println("Attempting to empty the arrival data")
return nil
}
用于检查上述 gRPC 实现的测试用例
我刚刚启动了一个测试 grpc 服务器,并在这个测试用例中调用了那个 grpc。客户端上的实现与接收数据相同。
func TestGetFlightArrivals(t *testing.T) {
const addr = "localhost:50051"
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Did not connect: #{err}")
}
defer conn.Close()
f := pb.NewFlightClient(conn)
t.Run("GetFlightArrivals", func(t *testing.T) {
var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
if err != nil {
t.Error("Failed to make the REST call", err)
}
for {
msg, err := res.Recv()
if err == io.EOF {
t.Log("Finished reading all the message")
break
}
if err != nil {
t.Error("Failed to receive response")
}
t.Log("Message from the server", msg.GetArrivalResponse())
}
})
}
根据@rubens21 的评论,我添加了以下几行
var emptyArrData ArrFlights
这基本上是完整的 XML 结构并将此 emptyArrData 分配给结构 flightArrData = emptyArrData