Go GRPC Refresh token 用于双向流
Go GRPC Refresh token for a bidirectional stream
TLDR:我正在寻找一种方法来在每次调用 stream.Send(msg)
时在打开的流上更新 header,而无需关闭流并打开一个新的。
总结
我有一个 GRPC 客户端和服务器,用于处理双向流。要向服务器进行身份验证,客户端必须在请求 headers 中发送 JWT,设置为“授权”。令牌有效期为 30 分钟。令牌过期后,服务器将终止连接。
我正在寻找一种方法来从客户端刷新我的授权令牌,并保持流打开。客户端应该 运行 在一个循环中每 30 分钟使用更新的令牌和更新的有效负载执行一个新请求。我还没有看到从客户端为已打开的流更新 header 的方法。
让我们看一些代码来了解客户端是什么样的。下面的代码有一个创建客户端新实例的功能,另一个功能是建立与 GRPC 服务器的连接。
func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
cc, err := newConnection(config, logger)
if err != nil {
return nil, err
}
service := proto.NewWatchServiceClient(cc)
return &WatchClient{
config: config,
conn: cc,
logger: entry,
service: service,
}, nil
}
func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
address := fmt.Sprintf("%s:%d", config.Host, config.Port)
// rpcCredential implements credentials.PerRPCCredentials
rpcCredential := newTokenAuth(config.Auth, config.TenantID)
return grpc.Dial(
address,
grpc.WithPerRPCCredentials(rpcCredential),
)
}
查看上面的 newConnection
函数,我们可以看到调用另一个函数 newTokenAuth
来创建身份验证令牌,效果很好。这个 func returns 一个实现 PerRPCCredentials 接口的结构。
有两种方法可以设置请求的权限。
使用grpc.WithPerRPCCredentials在创建到服务器的连接时添加授权。
使用 grpc.PerRPCCredentials 为连接到服务器时打开的每个流添加授权。
在这种情况下,我使用 grpc.WithPerRPCCredentials
在创建与服务器的连接时附加令牌。
现在,让我们看一下PerRPCCredentials的定义。
type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing
// tokens if required. This should be called by the transport layer on
// each request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status
// for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
// timeout and cancellation. Additionally, RequestInfo data will be
// available via ctx to this call.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
// RequireTransportSecurity indicates whether the credentials requires
// transport security.
RequireTransportSecurity() bool
}
该接口要求您定义两个方法。 GetRequestMetadata
的文档说
GetRequestMetadata gets the current request metadata, refreshing tokens if required
所以,看起来我的 PerRPCCredentials
实现应该能够为我的流或连接处理令牌刷新。来看看我实现的PerRPCCredentials
.
// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
tenantID string
tokenRequester auth.PlatformTokenGetter
token string
}
// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
return false
}
// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
token, err := t.tokenRequester.GetToken()
if err != nil {
return nil, err
}
t.token = token
go func() {
time.Sleep(25 * time.Minute)
token, _ := t.tokenRequester.GetToken()
t.token = token
}()
return map[string]string{
"tenant-id": t.tenantID,
"authorization": "Bearer " + t.token,
}, nil
}
如您所见,对 GetRequestMetadata
的调用将建立一个 go 例程,该例程将尝试每 25 分钟刷新一次令牌。在这里添加一个 go routine 可能不是正确的方法。试图让 auth header 刷新,但没有用。
我们来看一下直播。
func (w WatchClient) CreateWatch() error {
topic := &proto.Request{SelfLink: w.config.TopicSelfLink}
stream, err := w.service.CreateWatch(context.Background())
if err != nil {
return err
}
for {
err = stream.Send(topic)
if err != nil {
return err
}
time.Sleep(25 * time.Minute)
}
}
客户端每 25 分钟在流上发送一条消息。我想要到达这里的是,当调用 stream.Send
时,也会发送更新的令牌。
此函数 GetRequestMetadata
只被调用一次,无论我是通过 grpc.WithPerRPCCredentials
还是 grpc.PerRPCCredsCallOption
设置身份验证,所以似乎无法更新授权 header.
如果您知道我在尝试使用 PerRPCCredentials
进行令牌刷新时遗漏了什么,或者如果您知道可以通过其他方式完成此操作,可能是通过拦截器或其他方式那么请告诉我。
谢谢。
Headers在RPC开始时发送,在RPC过程中不能更新。如果您需要在流的生命周期内发送数据,它需要成为原型定义中请求消息的一部分。
TLDR:我正在寻找一种方法来在每次调用 stream.Send(msg)
时在打开的流上更新 header,而无需关闭流并打开一个新的。
总结
我有一个 GRPC 客户端和服务器,用于处理双向流。要向服务器进行身份验证,客户端必须在请求 headers 中发送 JWT,设置为“授权”。令牌有效期为 30 分钟。令牌过期后,服务器将终止连接。
我正在寻找一种方法来从客户端刷新我的授权令牌,并保持流打开。客户端应该 运行 在一个循环中每 30 分钟使用更新的令牌和更新的有效负载执行一个新请求。我还没有看到从客户端为已打开的流更新 header 的方法。
让我们看一些代码来了解客户端是什么样的。下面的代码有一个创建客户端新实例的功能,另一个功能是建立与 GRPC 服务器的连接。
func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
cc, err := newConnection(config, logger)
if err != nil {
return nil, err
}
service := proto.NewWatchServiceClient(cc)
return &WatchClient{
config: config,
conn: cc,
logger: entry,
service: service,
}, nil
}
func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
address := fmt.Sprintf("%s:%d", config.Host, config.Port)
// rpcCredential implements credentials.PerRPCCredentials
rpcCredential := newTokenAuth(config.Auth, config.TenantID)
return grpc.Dial(
address,
grpc.WithPerRPCCredentials(rpcCredential),
)
}
查看上面的 newConnection
函数,我们可以看到调用另一个函数 newTokenAuth
来创建身份验证令牌,效果很好。这个 func returns 一个实现 PerRPCCredentials 接口的结构。
有两种方法可以设置请求的权限。
使用grpc.WithPerRPCCredentials在创建到服务器的连接时添加授权。
使用 grpc.PerRPCCredentials 为连接到服务器时打开的每个流添加授权。
在这种情况下,我使用 grpc.WithPerRPCCredentials
在创建与服务器的连接时附加令牌。
现在,让我们看一下PerRPCCredentials的定义。
type PerRPCCredentials interface {
// GetRequestMetadata gets the current request metadata, refreshing
// tokens if required. This should be called by the transport layer on
// each request, and the data should be populated in headers or other
// context. If a status code is returned, it will be used as the status
// for the RPC. uri is the URI of the entry point for the request.
// When supported by the underlying implementation, ctx can be used for
// timeout and cancellation. Additionally, RequestInfo data will be
// available via ctx to this call.
// TODO(zhaoq): Define the set of the qualified keys instead of leaving
// it as an arbitrary string.
GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
// RequireTransportSecurity indicates whether the credentials requires
// transport security.
RequireTransportSecurity() bool
}
该接口要求您定义两个方法。 GetRequestMetadata
的文档说
GetRequestMetadata gets the current request metadata, refreshing tokens if required
所以,看起来我的 PerRPCCredentials
实现应该能够为我的流或连接处理令牌刷新。来看看我实现的PerRPCCredentials
.
// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
tenantID string
tokenRequester auth.PlatformTokenGetter
token string
}
// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
return false
}
// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
token, err := t.tokenRequester.GetToken()
if err != nil {
return nil, err
}
t.token = token
go func() {
time.Sleep(25 * time.Minute)
token, _ := t.tokenRequester.GetToken()
t.token = token
}()
return map[string]string{
"tenant-id": t.tenantID,
"authorization": "Bearer " + t.token,
}, nil
}
如您所见,对 GetRequestMetadata
的调用将建立一个 go 例程,该例程将尝试每 25 分钟刷新一次令牌。在这里添加一个 go routine 可能不是正确的方法。试图让 auth header 刷新,但没有用。
我们来看一下直播。
func (w WatchClient) CreateWatch() error {
topic := &proto.Request{SelfLink: w.config.TopicSelfLink}
stream, err := w.service.CreateWatch(context.Background())
if err != nil {
return err
}
for {
err = stream.Send(topic)
if err != nil {
return err
}
time.Sleep(25 * time.Minute)
}
}
客户端每 25 分钟在流上发送一条消息。我想要到达这里的是,当调用 stream.Send
时,也会发送更新的令牌。
此函数 GetRequestMetadata
只被调用一次,无论我是通过 grpc.WithPerRPCCredentials
还是 grpc.PerRPCCredsCallOption
设置身份验证,所以似乎无法更新授权 header.
如果您知道我在尝试使用 PerRPCCredentials
进行令牌刷新时遗漏了什么,或者如果您知道可以通过其他方式完成此操作,可能是通过拦截器或其他方式那么请告诉我。
谢谢。
Headers在RPC开始时发送,在RPC过程中不能更新。如果您需要在流的生命周期内发送数据,它需要成为原型定义中请求消息的一部分。