从 grpc 服务器拦截器获取流文件大小

Get streamed file size from grpc server interceptor

我在原型文件中有一个这样定义的 Go 服务器双向流方法:

syntax = "proto3";
option go_package="pdfcompose/;pdfcompose";
package pdfcompose;

service PdfCompose {
  rpc Send (stream FileForm) returns (stream PdfFile) {}
}

message FileForm {
  bytes Upfile1 = 1;
  bytes Upfile2 = 2;
  bytes Upfile3 = 3;
}

message PdfFile {
  bytes File = 1;
}

而我的日志拦截器有如下接口:

func logInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)  error {
    fmt.Println("Log Interceptor")
    err := handler(srv, ss)
    if err != nil {
        return err
    }
    return nil
}

我正在使用 https://github.com/grpc-ecosystem/go-grpc-middleware 作为拦截器引擎。 我需要实现流式文件大小的日志记录(用于教育目的)并试图找出我可以从何处获取有关 FileForm 及其内容的任何数据。

我的第一个猜测是查看 grpc.ServerStream 参数 (ss) 以找到有关它的信息,它看起来包含很多数据,例如最大和最小 MessageSize,但注意实际内容长度。

如何使用这种拦截器获取传入文件的大小?

因此,正如@Brits 上面提到的,实现我想要的工作方式是为流编写包装器。

这是一个例子:https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/validator/validator.go我从这个 repo 中获取了以下代码,我希望我正确理解了 apache2 许可证并且复制该代码的一部分没有问题:

// StreamServerInterceptor returns a new streaming server interceptor that validates incoming messages.
//
// The stage at which invalid messages will be rejected with `InvalidArgument` varies based on the
// type of the RPC. For `ServerStream` (1:m) requests, it will happen before reaching any userspace
// handlers. For `ClientStream` (n:1) or `BidiStream` (n:m) RPCs, the messages will be rejected on
// calls to `stream.Recv()`.
func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        wrapper := &recvWrapper{stream}
        return handler(srv, wrapper)
    }
}

type recvWrapper struct {
    grpc.ServerStream
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
    if err := s.ServerStream.RecvMsg(m); err != nil {
        return err
    }

    if err := validate(m); err != nil {
        return err
    }

    return nil
}

validate() 函数实际上以 interface{} 形式获取请求,您需要使用类型断言将此请求转换为您需要的类型 v := req.(type).

例如,我向 FileForm 键入请求并能够检查其内容:

func logFileSize(req interface{}) error {
    m := req.(*pdfcompose.FileForm)
    println("SizeOfUpfile1: " + strconv.Itoa(int(binary.Size(m.Upfile1))))

    if m.Upfile2 != nil {
        println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
    }
    if m.Upfile3 != nil {
        println("SizeOfUpfile2: " + strconv.Itoa(int(binary.Size(m.Upfile3))))
    }
    return nil
}

func (s *recvWrapper) RecvMsg(m interface{}) error {
    if err := s.ServerStream.RecvMsg(m); err != nil {
        return err
    }
    //z := m.(pdfcompose.FileForm)
    if err := logFileSize(m); err != nil {
        return err
    }

    return nil
}

func StreamServerInterceptor() grpc.StreamServerInterceptor {
    return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
        wrapper := &recvWrapper{stream}
        return handler(srv, wrapper)
    }
}