如何发送到网络 api 控制器中的特定信号器客户端?

How to send to a specific signalr client in web api controller?

我想向特定客户端发送数据。我有 Asp.net 核心网络 api(.Net-6.0) 控制器,它有一个 Hub 帮助调用远程 Worker 服务上的方法。 Hub 主动向特定的 Worker clients 一个一个发送调用。 我如何以及在哪里保存 connectionId 和相应的 WorkerID 以便每当 MiniAppController 收到请求时,它都会使用 hubContext 通过正确的连接触发请求。代码示例是:

public class ChatHub : Hub
{
    private readonly ILogger<ChatHub> _logger;
    public ChatHub(ILogger<ChatHub> logger)
    {
        _logger = logger;
    }

    public async Task HandShake(string workerId, string message)
    {
        HubCallerContext context = this.Context;
        await Clients.Caller.SendAsync("HandShake", workerId, context.ConnectionId);
    }

    public override async Task OnConnectedAsync()
    {
        await Groups.AddToGroupAsync(Context.ConnectionId, "SignalR Users");

        await base.OnConnectedAsync();
    }
    public override async Task OnDisconnectedAsync(Exception exception)
    {
        await Groups.RemoveFromGroupAsync(Context.ConnectionId, "SignalR Users");
        _logger.LogInformation($"1.Server: Client disconnected  and left the group..............");
        await base.OnDisconnectedAsync(exception);
    }
} 

网络api 控制器:

    [Route("api/[controller]")]
[ApiController]
public class MiniAppController : ControllerBase
{
    private readonly IHubContext<ChatHub> _chatHubContext;
    private readonly ILogger<ChatHub> _logger;

    public MiniAppController(IHubContext<ChatHub> chatHubContext)
    {
        _chatHubContext = chatHubContext;
    }
    [HttpGet]
    public async Task<ActionResult<CheckoutInfo>> Checkout(string comID, string parkServerID, string parkLotID, string parkID, string miniAppID, string miniUserID, string sign)
    {
        string workerId = comID + parkServerID + parkLotID;//extracted from the method arguments
        ***//how to use workerId to send to a specific client???***
        ......
    }
}

Worker 服务作为 SignalR 客户端,我可以有多个 worker:

    public class Worker1 : BackgroundService
{
    private readonly ILogger<Worker1> _logger;
    private HubConnection _connection;

    public Worker1(ILogger<Worker1> logger)
    {
        _logger = logger;

        _connection = new HubConnectionBuilder()
            .WithUrl("http://localhost:5106/chatHub")
            .WithAutomaticReconnect()
            .Build();

        _connection.On<string, string>("HandShakeAck", HandShakeAck);
        _connection.On<string, string>("ReceiveMessage", ReceiveMessage);
        _connection.On<CheckoutRequest>("Checkout", Checkout);
    }
    public Task Checkout(CheckoutRequest checkoutRequest)
    {
        //send Checkoutinfo back
        CheckoutInfo checkoutInfo = new CheckoutInfo();
        _connection.InvokeAsync("ReceiveCheckoutInfo", workerId, checkoutInfo);
        return Task.CompletedTask;
    }
}

请帮忙。谢谢

我认为最好的方法是跟踪 signalR 组中的连接。每次建立连接时,我们都需要根据建立连接的 workerId 对其进行分组。最好在 onConnectedAsync 方法中执行此操作,因为这样我们就不必在每次重置连接时都手动执行此操作。

但是我们如何知道在 onConnectedAsync 方法中连接的是哪个 worker?通过使用访问令牌,我在我的应用程序中知道哪个用户正在连接的方式相同。

不过要提到的一件事是,在使用此访问令牌时,SignalR 会在使用 websocket 进行连接时将其作为查询参数。如果您有 IIS 记录您的活动连接并且您认为 worker id 敏感,您可能希望也可能不希望这样做。 (使用长轮询或 SSE 时,访问令牌将在请求的 header 中发送)。

因此:

您可以在启动连接时将 worker id 作为访问令牌传递。

    _connection = new HubConnectionBuilder()
        .WithUrl("http://localhost:5106/chatHub", options =>
         { 
            options.AccessTokenProvider = () => // pass worker id here;
         })
        .WithAutomaticReconnect()
        .Build();

注意:您可以选择加密工作人员 ID 并在服务器端对其解密。

如果您不想将 workerId 与访问令牌相关联,那么您也可以将其作为硬编码查询参数传递。 (尽管这样做会保留它作为所有 3 种类型的 signalR 连接的查询参数)。

    _connection = new HubConnectionBuilder()
        .WithUrl($"http://localhost:5106/chatHub?workerId={workerId}")
        .WithAutomaticReconnect()
        .Build();

您也可以使用完全成熟的 JWT 令牌,如果愿意,也可以将 workerId 嵌入 JWT 令牌中。

下一步将是在 onConnectedAsync 方法中获取此 worker id。为此,我们需要:

  • workerId中间件(获取workerId)
  • workerId 服务(存储和访问 workerId)
  • workerId 要求属性(强制某些集线器方法存在 workerId)
  • WorkerId 中间件结果处理程序(如果 workerId 要求失败必须发生什么)
WorkerIdMiddleware 可以获取每个请求的 worker id 并将其存储在请求上下文中:

WorkerIdMiddleware.cs

public class WorkerIdMiddleware
{
    private readonly RequestDelegate _next;

    public WorkerIdMiddleware(RequestDelegate next)
    {
        _next = next;
    }

    public async Task Invoke(HttpContext httpContext)
    {
        var workerId = httpContext.Request.Query["access_token"];

        if (!string.IsNullOrEmpty(workerId))
        {
            AttachWorkerIdToContext(httpContext, workerId);
        }

        await _next(httpContext);
    }

    private void AttachWorkerIdToContext(HttpContext httpContext, string workerId)
    {
        if (ValidWorkerId(workerId))
        {
            httpContext.Items["WorkerId"] = workerId;
        }
    }

    private bool ValidWorkerId(string workerId)
    {
        // Validate the worker id if you need to
    }
}
然后我们可以通过 WorkerIdService 访问 workerId:

WorkerIdService.cs

public class WorkerIdService
{
    private string _currentWorkerId;
    private readonly IHttpContextAccessor _httpContextAccessor;

    public WorkerIdService(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
        _currentWorkerId = GetCurrentWorkerIdFromHttpContext();
    }

    public string CurrentWorkerId
    {
        get
        {
            if (_currentWorkerId == null)
            {
                _currentWorkerId = GetCurrentWorkerIdFromHttpContext();
            }

            return _currentWorkerId;
        }
    }


    private string GetCurrentWorkerIdFromHttpContext()
    {
        return (string)_httpContextAccessor.HttpContext?.Items?["WorkerId"];
    }
}
workerId 需求和需求处理程序将使我们能够保护我们的 signalR 方法并确保在我们需要时传递 worker id:

ChatHubWorkerIdRequirement.cs

using Microsoft.AspNetCore.Authorization;

public class ChatHubWorkerIdRequirement : IAuthorizationRequirement
{
}

ChatHubWorkerIdHandler.cs

public class ChatHubWorkerIdHandler : AuthorizationHandler<ChatHubWorkerIdRequirement>
{
    readonly IHttpContextAccessor _httpContextAccessor;

    public ChatHubWorkerIdHandler(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }
    protected override Task HandleRequirementAsync(AuthorizationHandlerContext context, ChatHubWorkerIdRequirement requirement)
    {
        var workerId = (string)_httpContextAccessor.HttpContext.Items["WorkerId"];

        if (workerId != null)
        {
            // Connection may proceed successfully
            context.Succeed(requirement);
        }

        // Return completed task  
        return Task.CompletedTask;
    }
}
为了自定义当workerId需求失败时响应的状态码,我们可以使用一个AuthorizationMiddlewareResultHandler

HubWorkerIdResponseHandler.cs

public class HubWorkerIdResponseHandler : IAuthorizationMiddlewareResultHandler
{
    private readonly IAuthorizationMiddlewareResultHandler _handler;

    public HubWorkerIdResponseHandler()
    {
        _handler = new AuthorizationMiddlewareResultHandler();
    }

    public async Task HandleAsync(
        RequestDelegate requestDelegate,
        HttpContext httpContext,
        AuthorizationPolicy authorizationPolicy,
        PolicyAuthorizationResult policyAuthorizationResult)
    {
        if (IsFailedPolicy(policyAuthorizationResult) && IsHubWorkerIdPolicy(authorizationPolicy))
        {
            // return whatever status code you wish if the hub is connected to without a worker id
            httpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
            return;
        }

        await _handler.HandleAsync(requestDelegate, httpContext, authorizationPolicy, policyAuthorizationResult);
    }

    private static bool IsFailedPolicy(PolicyAuthorizationResult policyAuthorizationResult)
    {
        return !policyAuthorizationResult.Succeeded;
    }

    private static bool IsHubWorkerIdPolicy(AuthorizationPolicy authorizationPolicy)
    {
        return authorizationPolicy.Requirements.OfType<ChatHubWorkerIdRequirement>().Any();
    }
}
最后,您需要像这样在您的启动中注册所有内容:
    public void ConfigureServices(IServiceCollection services)
    { 
        ...
        // Add the workerId policy
        services.AddSingleton<IAuthorizationHandler, ChatHubWorkerIdHandler>();
        services.AddAuthorization(options =>
        {
            options.AddPolicy("WorkerIdPolicy", policy =>
            {
                policy.Requirements.Add(new ChatHubWorkerIdRequirement());
            });
        });

        // Hub Policy failure response handler (this will handle the failed requirement above)
        services.AddSingleton<IAuthorizationMiddlewareResultHandler, HubWorkerIdResponseHandler>();

        services.AddSignalR();

        services.AddHttpContextAccessor();

        services.AddScoped<IWorkerIdService, WorkerIdService>();
    }

    public void Configure(IApplicationBuilder app)
    {
       ...
       app.UseMiddleware<JwtMiddleware>();
       app.UseAuthorization();
       app.UseEndpoints(endpoints =>
          ...
          endpoints.MapHub<ChatHub>("ChatHub"); 
      );
       ...
    }
您现在可以使用我们创建的新授权策略属性装饰您的 ChatHub。通过装饰整个集线器 class,将在 onConnectedAsync 方法期间评估策略。

(如果您希望策略基于方法触发,则需要使用 workerId 策略属性装饰每个单独的方法)

[Authorize(Policy = "WorkerIdPolicy")]
public class ChatHub : Hub
{
    ....
}
然后您可以在 onConnectedAsync 方法期间从 WorkerIdService 访问 CurrentWorkerId:
public override async Task OnConnectedAsync()
{
    await Groups.AddToGroupAsync(Context.ConnectionId, "SignalR Users");
    // group the connections by workerId
    await Groups.AddToGroupAsync(Context.ConnectionId, $"Worker-{_workerIdService.CurrentWorkerId}");
    await base.OnConnectedAsync();
}

一切就绪后,您将能够使用 workerId 向该工作组发送信号,并且知道只有具有该 workerId 的 client/s 会收到它。