在 Service Fabric 传输中传递对 Reliable Services 的调用中的用户和审核信息

Passing user and auditing information in calls to Reliable Services in Service Fabric transport

如何以一种简单的方式在客户端和服务之间传递审计信息,而不必将该信息添加为所有服务方法的参数?我可以使用消息 headers 为通话设置此数据吗?

有没有办法允许服务也将其传递到下游,即,如果服务 A 调用服务 B,服务 B 调用服务 C,是否可以先将相同的审计信息发送给 A,然后在 A 的调用中发送给 B,然后在 B 的调用中打电话给 C?

如果您使用 fabric transport 进行远程处理,实际上有一个在客户端和服务之间传递的 header 的概念。如果您使用的是 Http 传输,那么您会像处理任何 http 请求一样拥有 headers。

请注意,下面的建议不是最简单的解决方案,但它一旦到位就解决了问题并且很容易使用,但如果你在整个代码库中寻找简单的方法,这可能不是路要走。如果是这种情况,那么我建议您只需将一些通用审计信息参数添加到所有服务方法中即可。当一些开发人员忘记添加它或者在调用下游服务时它没有正确设置时,当然会有一个很大的警告。一切都是关于 trade-offs,一如既往的代码 :).

掉入兔子洞

在结构传输中,通信涉及两个 classes:发送 IServiceRemotingClient on the client side, and an instance of IServiceRemotingListener on the service side. In each request from the client the messgae body and ServiceRemotingMessageHeaders 的一个实例。开箱即用的这些 headers 包括正在调用哪个接口(即哪个服务)和哪个方法的信息(这也是底层接收器如何知道如何解压缩 body 字节数组的方式) ).对于通过 ActorService 对 Actors 的调用,额外的 Actor 信息也包含在那些 header 中。

棘手的部分是连接到该交换并实际设置,然后读取额外的 headers。请耐心等待,我们需要了解幕后涉及的许多 classes。

服务端

当您为您的服务设置 IServiceRemotingListener 时(无状态服务的示例),您通常会使用方便的扩展方法,如下所示:

 protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
 {
     yield return new ServiceInstanceListener(context => 
         this.CreateServiceRemotingListener(this.Context));
 }

(另一种方法是实现自己的监听器,但这并不是我们真正不想做的,我们只是不想在现有的基础上添加东西基础设施。见下文了解该方法。)

这是我们可以提供自己的侦听器的地方,类似于扩展方法在幕后所做的事情。让我们首先看看该扩展方法的作用。它会在您的服务项目上寻找程序集级别的特定属性:ServiceRemotingProviderAttribute. That one is abstract, but the one that you can use, and which you will get a default instance of, if none is provided, is FabricTransportServiceRemotingProviderAttribute。在 AssemblyInfo.cs 中设置(或任何其他文件,它是一个程序集属性):

[assembly: FabricTransportServiceRemotingProvider()]

这个属性有两个有趣的可覆盖方法:

public override IServiceRemotingListener CreateServiceRemotingListener(
    ServiceContext serviceContext, IService serviceImplementation)
public override IServiceRemotingClientFactory CreateServiceRemotingClientFactory(
    IServiceRemotingCallbackClient callbackClient)

这两个方法负责创建监听器和客户端工厂。这意味着它也被交易的客户端检查。这就是为什么它是服务组件的组件级别的属性,客户端也可以将它与我们要与之通信的客户端的 IService 派生接口一起选取。

CreateServiceRemotingListener ends up creating an instance FabricTransportServiceRemotingListener, however in this implementation we cannot set our own specific IServiceRemotingMessageHandler。如果您创建自己的 FabricTransportServiceRemotingProviderAttribute 的子 class 并覆盖它,那么您实际上可以创建一个 FabricTransportServiceRemotingListener 的实例,该实例在构造函数中接收调度程序:

public class AuditableFabricTransportServiceRemotingProviderAttribute : 
    FabricTransportServiceRemotingProviderAttribute
{
    public override IServiceRemotingListener CreateServiceRemotingListener(
        ServiceContext serviceContext, IService serviceImplementation)
    {
            var messageHandler = new AuditableServiceRemotingDispatcher(
                serviceContext, serviceImplementation);

            return (IServiceRemotingListener)new FabricTransportServiceRemotingListener(
                serviceContext: serviceContext,
                messageHandler: messageHandler);
    }
}

AuditableServiceRemotingDispatcher 是奇迹发生的地方。是我们自己的ServiceRemotingDispatcher子class。覆盖 RequestResponseAsync(忽略 HandleOneWay,服务远程处理不支持它,如果调用它会抛出 NotImplementedException),像这样:

public class AuditableServiceRemotingDispatcher : ServiceRemotingDispatcher
{
    public AuditableServiceRemotingDispatcher(ServiceContext serviceContext, IService service) : 
        base(serviceContext, service) { }

    public override async Task<byte[]> RequestResponseAsync(
        IServiceRemotingRequestContext requestContext, 
        ServiceRemotingMessageHeaders messageHeaders, 
        byte[] requestBodyBytes)
    {
        byte[] userHeader = null;
        if (messageHeaders.TryGetHeaderValue("user-header", out auditHeader))
        {
            // Deserialize from byte[] and handle the header
        }
        else
        {
            // Throw exception?
        }

        byte[] result = null;        
        result = await base.RequestResponseAsync(requestContext, messageHeaders, requestBodyBytes);
        return result;
    }
}

另一种更简单但不太灵活的方法是直接在服务中使用我们的自定义调度程序的实例直接创建 FabricTransportServiceRemotingListener 的实例:

 protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
 {
     yield return new ServiceInstanceListener(context => 
         new FabricTransportServiceRemotingListener(this.Context, new AuditableServiceRemotingDispatcher(context, this)));
 }

为什么这样不够灵活?好吧,因为使用该属性也支持客户端,如下所示

客户端

好的,现在我们可以在接收消息时读取自定义 headers,如何设置这些?让我们看看该属性的另一个方法:

public override IServiceRemotingClientFactory CreateServiceRemotingClientFactory(IServiceRemotingCallbackClient callbackClient)
{
    return (IServiceRemotingClientFactory)new FabricTransportServiceRemotingClientFactory(
        callbackClient: callbackClient,
        servicePartitionResolver: (IServicePartitionResolver)null,
        traceId: (string)null);
}

在这里我们不能只为服务注入特定的处理程序或类似的处理程序,我们必须提供自己的自定义工厂。为了不必重新实现 FabricTransportServiceRemotingClientFactory I simply encapsulate it in my own implementation of IServiceRemotingClientFactory 的细节:

public class AuditedFabricTransportServiceRemotingClientFactory : IServiceRemotingClientFactory, ICommunicationClientFactory<IServiceRemotingClient>
{
    private readonly ICommunicationClientFactory<IServiceRemotingClient> _innerClientFactory;

    public AuditedFabricTransportServiceRemotingClientFactory(ICommunicationClientFactory<IServiceRemotingClient> innerClientFactory)
    {
        _innerClientFactory = innerClientFactory;
        _innerClientFactory.ClientConnected += OnClientConnected;
        _innerClientFactory.ClientDisconnected += OnClientDisconnected;
    }

    private void OnClientConnected(object sender, CommunicationClientEventArgs<IServiceRemotingClient> e)
    {
        EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> clientConnected = this.ClientConnected;
        if (clientConnected == null) return;
        clientConnected((object)this, new CommunicationClientEventArgs<IServiceRemotingClient>()
        {
            Client = e.Client
        });
    }

    private void OnClientDisconnected(object sender, CommunicationClientEventArgs<IServiceRemotingClient> e)
    {
        EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> clientDisconnected = this.ClientDisconnected;
        if (clientDisconnected == null) return;
        clientDisconnected((object)this, new CommunicationClientEventArgs<IServiceRemotingClient>()
        {
            Client = e.Client
        });
    }

    public async Task<IServiceRemotingClient> GetClientAsync(
        Uri serviceUri,
        ServicePartitionKey partitionKey, 
        TargetReplicaSelector targetReplicaSelector, 
        string listenerName,
        OperationRetrySettings retrySettings, 
        CancellationToken cancellationToken)
    {
        var client = await _innerClientFactory.GetClientAsync(
            serviceUri, 
            partitionKey, 
            targetReplicaSelector, 
            listenerName, 
            retrySettings, 
            cancellationToken);
        return new AuditedFabricTransportServiceRemotingClient(client);
    }

    public async Task<IServiceRemotingClient> GetClientAsync(
        ResolvedServicePartition previousRsp, 
        TargetReplicaSelector targetReplicaSelector, 
        string listenerName, 
        OperationRetrySettings retrySettings,
        CancellationToken cancellationToken)
    {
        var client = await _innerClientFactory.GetClientAsync(
            previousRsp, 
            targetReplicaSelector, 
            listenerName, 
            retrySettings, 
            cancellationToken);
        return new AuditedFabricTransportServiceRemotingClient(client);
    }

    public Task<OperationRetryControl> ReportOperationExceptionAsync(
        IServiceRemotingClient client, 
        ExceptionInformation exceptionInformation, 
        OperationRetrySettings retrySettings,
        CancellationToken cancellationToken)
    {
        return _innerClientFactory.ReportOperationExceptionAsync(
            client, 
            exceptionInformation, 
            retrySettings, 
            cancellationToken);
    }

    public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientConnected;
    public event EventHandler<CommunicationClientEventArgs<IServiceRemotingClient>> ClientDisconnected;
}

这个实现简单地将任何繁重的工作传递给底层工厂,同时返回它自己的可审计客户端,该客户端类似地封装了一个 IServiceRemotingClient:

 public class AuditedFabricTransportServiceRemotingClient : IServiceRemotingClient, ICommunicationClient
{
    private readonly IServiceRemotingClient _innerClient;

    public AuditedFabricTransportServiceRemotingClient(IServiceRemotingClient innerClient)
    {
        _innerClient = innerClient;
    }

    ~AuditedFabricTransportServiceRemotingClient()
    {
        if (this._innerClient == null) return;
        var disposable = this._innerClient as IDisposable;
        disposable?.Dispose();
    }

    Task<byte[]> IServiceRemotingClient.RequestResponseAsync(ServiceRemotingMessageHeaders messageHeaders, byte[] requestBody)
    {            
        messageHeaders.SetUser(ServiceRequestContext.Current.User);
        messageHeaders.SetCorrelationId(ServiceRequestContext.Current.CorrelationId);
        return this._innerClient.RequestResponseAsync(messageHeaders, requestBody);
    }

    void IServiceRemotingClient.SendOneWay(ServiceRemotingMessageHeaders messageHeaders, byte[] requestBody)
    {
        messageHeaders.SetUser(ServiceRequestContext.Current.User);
        messageHeaders.SetCorrelationId(ServiceRequestContext.Current.CorrelationId);
        this._innerClient.SendOneWay(messageHeaders, requestBody);
    }

    public ResolvedServicePartition ResolvedServicePartition
    {
        get { return this._innerClient.ResolvedServicePartition; }
        set { this._innerClient.ResolvedServicePartition = value; }
    }

    public string ListenerName
    {
        get { return this._innerClient.ListenerName; }
        set { this._innerClient.ListenerName = value; }
    }
    public ResolvedServiceEndpoint Endpoint
    {
        get { return this._innerClient.Endpoint; }
        set { this._innerClient.Endpoint = value; }
    }
}

现在,这里是我们实际(也是最终)设置要传递给服务的审核名称的地方。

调用链和服务请求上下文

拼图的最后一块,ServiceRequestContext,它是一个自定义 class,允许我们处理服务请求调用的环境上下文。这是相关的,因为它为我们提供了一种在调用链中传播上下文信息的简单方法,例如用户或相关 ID(或我们希望在客户端和服务之间传递的任何其他 header 信息)。实现 ServiceRequestContext 看起来像:

public sealed class ServiceRequestContext
{
    private static readonly string ContextKey = Guid.NewGuid().ToString();

    public ServiceRequestContext(Guid correlationId, string user)
    {
        this.CorrelationId = correlationId;
        this.User = user;
    }

    public Guid CorrelationId { get; private set; }

    public string User { get; private set; }

    public static ServiceRequestContext Current
    {
        get { return (ServiceRequestContext)CallContext.LogicalGetData(ContextKey); }
        internal set
        {
            if (value == null)
            {
                CallContext.FreeNamedDataSlot(ContextKey);
            }
            else
            {
                CallContext.LogicalSetData(ContextKey, value);
            }
        }
    }

    public static Task RunInRequestContext(Func<Task> action, Guid correlationId, string user)
    {
        Task<Task> task = null;
        task = new Task<Task>(async () =>
        {
            Debug.Assert(ServiceRequestContext.Current == null);
            ServiceRequestContext.Current = new ServiceRequestContext(correlationId, user);
            try
            {
                await action();
            }
            finally
            {
                ServiceRequestContext.Current = null;
            }
        });
        task.Start();
        return task.Unwrap();
    }

    public static Task<TResult> RunInRequestContext<TResult>(Func<Task<TResult>> action, Guid correlationId, string user)
    {
        Task<Task<TResult>> task = null;
        task = new Task<Task<TResult>>(async () =>
        {
            Debug.Assert(ServiceRequestContext.Current == null);
            ServiceRequestContext.Current = new ServiceRequestContext(correlationId, user);
            try
            {
                return await action();
            }
            finally
            {
                ServiceRequestContext.Current = null;
            }
        });
        task.Start();
        return task.Unwrap<TResult>();
    }
}

这最后一部分深受 SO answer by Stephen Cleary 的影响。它为我们提供了一种简单的方法来处理调用层次结构中的环境信息,无论它们在任务上是同步的还是异步的。现在,有了这个,我们也可以在服务的 Dispatcher 中设置该信息CE侧:

    public override Task<byte[]> RequestResponseAsync(
        IServiceRemotingRequestContext requestContext, 
        ServiceRemotingMessageHeaders messageHeaders, 
        byte[] requestBody)
    {
        var user = messageHeaders.GetUser();
        var correlationId = messageHeaders.GetCorrelationId();

        return ServiceRequestContext.RunInRequestContext(async () => 
            await base.RequestResponseAsync(
                requestContext, 
                messageHeaders, 
                requestBody), 
            correlationId, user);
    }

GetUser()GetCorrelationId() 只是获取和解压缩客户端设置的 header 的辅助方法)

有了这个意味着服务为任何附加调用创建的任何新客户端也将设置 sam headers,所以在场景 ServiceA -> ServiceB -> ServiceC 中我们仍然会有ServiceB调用ServiceC时设置的同一个用户

什么?有那么容易吗?是的 ;)

从服务内部,例如无状态 OWIN 网络 api,您首先在其中捕获用户信息,然后创建 ServiceProxyFactory 的实例并将该调用包装在 ServiceRequestContext:

var task = ServiceRequestContext.RunInRequestContext(async () =>
{
    var serviceA = ServiceProxyFactory.CreateServiceProxy<IServiceA>(new Uri($"{FabricRuntime.GetActivationContext().ApplicationName}/ServiceA"));
    await serviceA.DoStuffAsync(CancellationToken.None);
}, Guid.NewGuid(), user);

好的,总结一下 - 您可以连接到远程服务来设置您自己的 headers。正如我们在上面看到的,需要完成一些工作才能为该机制建立适当的机制,主要是创建您自己的底层基础设施的子classes。好处是,一旦您准备就绪,您就可以很容易地审计您的服务调用。