Go GRPC Refresh token 用于双向流

Go GRPC Refresh token for a bidirectional stream

TLDR:我正在寻找一种方法来在每次调用 stream.Send(msg) 时在打开的流上更新 header,而无需关闭流并打开一个新的。

总结

我有一个 GRPC 客户端和服务器,用于处理双向流。要向服务器进行身份验证,客户端必须在请求 headers 中发送 JWT,设置为“授权”。令牌有效期为 30 分钟。令牌过期后,服务器将终止连接。

我正在寻找一种方法来从客户端刷新我的授权令牌,并保持流打开。客户端应该 运行 在一个循环中每 30 分钟使用更新的令牌和更新的有效负载执行一个新请求。我还没有看到从客户端为已打开的流更新 header 的方法。

让我们看一些代码来了解客户端是什么样的。下面的代码有一个创建客户端新实例的功能,另一个功能是建立与 GRPC 服务器的连接。

func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
    cc, err := newConnection(config, logger)
    if err != nil {
        return nil, err
    }

    service := proto.NewWatchServiceClient(cc)

    return &WatchClient{
        config:  config,
        conn:    cc,
        logger:  entry,
        service: service,
    }, nil
}

func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
    address := fmt.Sprintf("%s:%d", config.Host, config.Port)

    // rpcCredential implements credentials.PerRPCCredentials
    rpcCredential := newTokenAuth(config.Auth, config.TenantID)

    return grpc.Dial(
        address,
        grpc.WithPerRPCCredentials(rpcCredential),
    )
}

查看上面的 newConnection 函数,我们可以看到调用另一个函数 newTokenAuth 来创建身份验证令牌,效果很好。这个 func returns 一个实现 PerRPCCredentials 接口的结构。

有两种方法可以设置请求的权限。

  1. 使用grpc.WithPerRPCCredentials在创建到服务器的连接时添加授权。

  2. 使用 grpc.PerRPCCredentials 为连接到服务器时打开的每个流添加授权。

在这种情况下,我使用 grpc.WithPerRPCCredentials 在创建与服务器的连接时附加令牌。

现在,让我们看一下PerRPCCredentials的定义。

type PerRPCCredentials interface {
    // GetRequestMetadata gets the current request metadata, refreshing
    // tokens if required. This should be called by the transport layer on
    // each request, and the data should be populated in headers or other
    // context. If a status code is returned, it will be used as the status
    // for the RPC. uri is the URI of the entry point for the request.
    // When supported by the underlying implementation, ctx can be used for
    // timeout and cancellation. Additionally, RequestInfo data will be
    // available via ctx to this call.
    // TODO(zhaoq): Define the set of the qualified keys instead of leaving
    // it as an arbitrary string.
    GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
    // RequireTransportSecurity indicates whether the credentials requires
    // transport security.
    RequireTransportSecurity() bool
}

该接口要求您定义两个方法。 GetRequestMetadata 的文档说

GetRequestMetadata gets the current request metadata, refreshing tokens if required

所以,看起来我的 PerRPCCredentials 实现应该能够为我的流或连接处理令牌刷新。来看看我实现的PerRPCCredentials.

// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
    tenantID       string
    tokenRequester auth.PlatformTokenGetter
    token          string
}

// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
    return false
}

// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
    token, err := t.tokenRequester.GetToken()
    if err != nil {
        return nil, err
    }
    t.token = token

    go func() {
        time.Sleep(25 * time.Minute)
        token, _ := t.tokenRequester.GetToken()
        t.token = token
    }()

    return map[string]string{
        "tenant-id": t.tenantID,
        "authorization":     "Bearer " + t.token,
    }, nil
}

如您所见,对 GetRequestMetadata 的调用将建立一个 go 例程,该例程将尝试每 25 分钟刷新一次令牌。在这里添加一个 go routine 可能不是正确的方法。试图让 auth header 刷新,但没有用。

我们来看一下直播。

func (w WatchClient) CreateWatch() error {
    topic := &proto.Request{SelfLink: w.config.TopicSelfLink}

    stream, err := w.service.CreateWatch(context.Background())
    if err != nil {
        return err
    }

    for {
        err = stream.Send(topic)
        if err != nil {
            return err
        }
        time.Sleep(25 * time.Minute)
    }
}

客户端每 25 分钟在流上发送一条消息。我想要到达这里的是,当调用 stream.Send 时,也会发送更新的令牌。

此函数 GetRequestMetadata 只被调用一次,无论我是通过 grpc.WithPerRPCCredentials 还是 grpc.PerRPCCredsCallOption 设置身份验证,所以似乎无法更新授权 header.

如果您知道我在尝试使用 PerRPCCredentials 进行令牌刷新时遗漏了什么,或者如果您知道可以通过其他方式完成此操作,可能是通过拦截器或其他方式那么请告诉我。

谢谢。

Headers在RPC开始时发送,在RPC过程中不能更新。如果您需要在流的生命周期内发送数据,它需要成为原型定义中请求消息的一部分。