如何发送到网络 api 控制器中的特定信号器客户端?
How to send to a specific signalr client in web api controller?
我想向特定客户端发送数据。我有 Asp.net 核心网络 api(.Net-6.0) 控制器,它有一个 Hub 帮助调用远程 Worker 服务上的方法。 Hub 主动向特定的 Worker clients 一个一个发送调用。 我如何以及在哪里保存 connectionId 和相应的 WorkerID 以便每当 MiniAppController 收到请求时,它都会使用 hubContext 通过正确的连接触发请求。代码示例是:
public class ChatHub : Hub
{
private readonly ILogger<ChatHub> _logger;
public ChatHub(ILogger<ChatHub> logger)
{
_logger = logger;
}
public async Task HandShake(string workerId, string message)
{
HubCallerContext context = this.Context;
await Clients.Caller.SendAsync("HandShake", workerId, context.ConnectionId);
}
public override async Task OnConnectedAsync()
{
await Groups.AddToGroupAsync(Context.ConnectionId, "SignalR Users");
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, "SignalR Users");
_logger.LogInformation($"1.Server: Client disconnected and left the group..............");
await base.OnDisconnectedAsync(exception);
}
}
网络api 控制器:
[Route("api/[controller]")]
[ApiController]
public class MiniAppController : ControllerBase
{
private readonly IHubContext<ChatHub> _chatHubContext;
private readonly ILogger<ChatHub> _logger;
public MiniAppController(IHubContext<ChatHub> chatHubContext)
{
_chatHubContext = chatHubContext;
}
[HttpGet]
public async Task<ActionResult<CheckoutInfo>> Checkout(string comID, string parkServerID, string parkLotID, string parkID, string miniAppID, string miniUserID, string sign)
{
string workerId = comID + parkServerID + parkLotID;//extracted from the method arguments
***//how to use workerId to send to a specific client???***
......
}
}
Worker 服务作为 SignalR 客户端,我可以有多个 worker:
public class Worker1 : BackgroundService
{
private readonly ILogger<Worker1> _logger;
private HubConnection _connection;
public Worker1(ILogger<Worker1> logger)
{
_logger = logger;
_connection = new HubConnectionBuilder()
.WithUrl("http://localhost:5106/chatHub")
.WithAutomaticReconnect()
.Build();
_connection.On<string, string>("HandShakeAck", HandShakeAck);
_connection.On<string, string>("ReceiveMessage", ReceiveMessage);
_connection.On<CheckoutRequest>("Checkout", Checkout);
}
public Task Checkout(CheckoutRequest checkoutRequest)
{
//send Checkoutinfo back
CheckoutInfo checkoutInfo = new CheckoutInfo();
_connection.InvokeAsync("ReceiveCheckoutInfo", workerId, checkoutInfo);
return Task.CompletedTask;
}
}
请帮忙。谢谢
我认为最好的方法是跟踪 signalR 组中的连接。每次建立连接时,我们都需要根据建立连接的 workerId 对其进行分组。最好在 onConnectedAsync 方法中执行此操作,因为这样我们就不必在每次重置连接时都手动执行此操作。
但是我们如何知道在 onConnectedAsync 方法中连接的是哪个 worker?通过使用访问令牌,我在我的应用程序中知道哪个用户正在连接的方式相同。
不过要提到的一件事是,在使用此访问令牌时,SignalR 会在使用 websocket 进行连接时将其作为查询参数。如果您有 IIS 记录您的活动连接并且您认为 worker id 敏感,您可能希望也可能不希望这样做。
(使用长轮询或 SSE 时,访问令牌将在请求的 header 中发送)。
因此:
您可以在启动连接时将 worker id 作为访问令牌传递。
_connection = new HubConnectionBuilder()
.WithUrl("http://localhost:5106/chatHub", options =>
{
options.AccessTokenProvider = () => // pass worker id here;
})
.WithAutomaticReconnect()
.Build();
注意:您可以选择加密工作人员 ID 并在服务器端对其解密。
如果您不想将 workerId 与访问令牌相关联,那么您也可以将其作为硬编码查询参数传递。 (尽管这样做会保留它作为所有 3 种类型的 signalR 连接的查询参数)。
_connection = new HubConnectionBuilder()
.WithUrl($"http://localhost:5106/chatHub?workerId={workerId}")
.WithAutomaticReconnect()
.Build();
您也可以使用完全成熟的 JWT 令牌,如果愿意,也可以将 workerId 嵌入 JWT 令牌中。
下一步将是在 onConnectedAsync 方法中获取此 worker id。为此,我们需要:
- workerId中间件(获取workerId)
- workerId 服务(存储和访问 workerId)
- workerId 要求属性(强制某些集线器方法存在 workerId)
- WorkerId 中间件结果处理程序(如果 workerId 要求失败必须发生什么)
WorkerIdMiddleware 可以获取每个请求的 worker id 并将其存储在请求上下文中:
WorkerIdMiddleware.cs
public class WorkerIdMiddleware
{
private readonly RequestDelegate _next;
public WorkerIdMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task Invoke(HttpContext httpContext)
{
var workerId = httpContext.Request.Query["access_token"];
if (!string.IsNullOrEmpty(workerId))
{
AttachWorkerIdToContext(httpContext, workerId);
}
await _next(httpContext);
}
private void AttachWorkerIdToContext(HttpContext httpContext, string workerId)
{
if (ValidWorkerId(workerId))
{
httpContext.Items["WorkerId"] = workerId;
}
}
private bool ValidWorkerId(string workerId)
{
// Validate the worker id if you need to
}
}
然后我们可以通过 WorkerIdService 访问 workerId:
WorkerIdService.cs
public class WorkerIdService
{
private string _currentWorkerId;
private readonly IHttpContextAccessor _httpContextAccessor;
public WorkerIdService(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
_currentWorkerId = GetCurrentWorkerIdFromHttpContext();
}
public string CurrentWorkerId
{
get
{
if (_currentWorkerId == null)
{
_currentWorkerId = GetCurrentWorkerIdFromHttpContext();
}
return _currentWorkerId;
}
}
private string GetCurrentWorkerIdFromHttpContext()
{
return (string)_httpContextAccessor.HttpContext?.Items?["WorkerId"];
}
}
workerId 需求和需求处理程序将使我们能够保护我们的 signalR 方法并确保在我们需要时传递 worker id:
ChatHubWorkerIdRequirement.cs
using Microsoft.AspNetCore.Authorization;
public class ChatHubWorkerIdRequirement : IAuthorizationRequirement
{
}
ChatHubWorkerIdHandler.cs
public class ChatHubWorkerIdHandler : AuthorizationHandler<ChatHubWorkerIdRequirement>
{
readonly IHttpContextAccessor _httpContextAccessor;
public ChatHubWorkerIdHandler(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
protected override Task HandleRequirementAsync(AuthorizationHandlerContext context, ChatHubWorkerIdRequirement requirement)
{
var workerId = (string)_httpContextAccessor.HttpContext.Items["WorkerId"];
if (workerId != null)
{
// Connection may proceed successfully
context.Succeed(requirement);
}
// Return completed task
return Task.CompletedTask;
}
}
为了自定义当workerId需求失败时响应的状态码,我们可以使用一个AuthorizationMiddlewareResultHandler
HubWorkerIdResponseHandler.cs
public class HubWorkerIdResponseHandler : IAuthorizationMiddlewareResultHandler
{
private readonly IAuthorizationMiddlewareResultHandler _handler;
public HubWorkerIdResponseHandler()
{
_handler = new AuthorizationMiddlewareResultHandler();
}
public async Task HandleAsync(
RequestDelegate requestDelegate,
HttpContext httpContext,
AuthorizationPolicy authorizationPolicy,
PolicyAuthorizationResult policyAuthorizationResult)
{
if (IsFailedPolicy(policyAuthorizationResult) && IsHubWorkerIdPolicy(authorizationPolicy))
{
// return whatever status code you wish if the hub is connected to without a worker id
httpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
return;
}
await _handler.HandleAsync(requestDelegate, httpContext, authorizationPolicy, policyAuthorizationResult);
}
private static bool IsFailedPolicy(PolicyAuthorizationResult policyAuthorizationResult)
{
return !policyAuthorizationResult.Succeeded;
}
private static bool IsHubWorkerIdPolicy(AuthorizationPolicy authorizationPolicy)
{
return authorizationPolicy.Requirements.OfType<ChatHubWorkerIdRequirement>().Any();
}
}
最后,您需要像这样在您的启动中注册所有内容:
public void ConfigureServices(IServiceCollection services)
{
...
// Add the workerId policy
services.AddSingleton<IAuthorizationHandler, ChatHubWorkerIdHandler>();
services.AddAuthorization(options =>
{
options.AddPolicy("WorkerIdPolicy", policy =>
{
policy.Requirements.Add(new ChatHubWorkerIdRequirement());
});
});
// Hub Policy failure response handler (this will handle the failed requirement above)
services.AddSingleton<IAuthorizationMiddlewareResultHandler, HubWorkerIdResponseHandler>();
services.AddSignalR();
services.AddHttpContextAccessor();
services.AddScoped<IWorkerIdService, WorkerIdService>();
}
public void Configure(IApplicationBuilder app)
{
...
app.UseMiddleware<JwtMiddleware>();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
...
endpoints.MapHub<ChatHub>("ChatHub");
);
...
}
您现在可以使用我们创建的新授权策略属性装饰您的 ChatHub。通过装饰整个集线器 class,将在 onConnectedAsync 方法期间评估策略。
(如果您希望策略基于方法触发,则需要使用 workerId 策略属性装饰每个单独的方法)
[Authorize(Policy = "WorkerIdPolicy")]
public class ChatHub : Hub
{
....
}
然后您可以在 onConnectedAsync 方法期间从 WorkerIdService 访问 CurrentWorkerId:
public override async Task OnConnectedAsync()
{
await Groups.AddToGroupAsync(Context.ConnectionId, "SignalR Users");
// group the connections by workerId
await Groups.AddToGroupAsync(Context.ConnectionId, $"Worker-{_workerIdService.CurrentWorkerId}");
await base.OnConnectedAsync();
}
一切就绪后,您将能够使用 workerId 向该工作组发送信号,并且知道只有具有该 workerId 的 client/s 会收到它。
我想向特定客户端发送数据。我有 Asp.net 核心网络 api(.Net-6.0) 控制器,它有一个 Hub 帮助调用远程 Worker 服务上的方法。 Hub 主动向特定的 Worker clients 一个一个发送调用。 我如何以及在哪里保存 connectionId 和相应的 WorkerID 以便每当 MiniAppController 收到请求时,它都会使用 hubContext 通过正确的连接触发请求。代码示例是:
public class ChatHub : Hub
{
private readonly ILogger<ChatHub> _logger;
public ChatHub(ILogger<ChatHub> logger)
{
_logger = logger;
}
public async Task HandShake(string workerId, string message)
{
HubCallerContext context = this.Context;
await Clients.Caller.SendAsync("HandShake", workerId, context.ConnectionId);
}
public override async Task OnConnectedAsync()
{
await Groups.AddToGroupAsync(Context.ConnectionId, "SignalR Users");
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, "SignalR Users");
_logger.LogInformation($"1.Server: Client disconnected and left the group..............");
await base.OnDisconnectedAsync(exception);
}
}
网络api 控制器:
[Route("api/[controller]")]
[ApiController]
public class MiniAppController : ControllerBase
{
private readonly IHubContext<ChatHub> _chatHubContext;
private readonly ILogger<ChatHub> _logger;
public MiniAppController(IHubContext<ChatHub> chatHubContext)
{
_chatHubContext = chatHubContext;
}
[HttpGet]
public async Task<ActionResult<CheckoutInfo>> Checkout(string comID, string parkServerID, string parkLotID, string parkID, string miniAppID, string miniUserID, string sign)
{
string workerId = comID + parkServerID + parkLotID;//extracted from the method arguments
***//how to use workerId to send to a specific client???***
......
}
}
Worker 服务作为 SignalR 客户端,我可以有多个 worker:
public class Worker1 : BackgroundService
{
private readonly ILogger<Worker1> _logger;
private HubConnection _connection;
public Worker1(ILogger<Worker1> logger)
{
_logger = logger;
_connection = new HubConnectionBuilder()
.WithUrl("http://localhost:5106/chatHub")
.WithAutomaticReconnect()
.Build();
_connection.On<string, string>("HandShakeAck", HandShakeAck);
_connection.On<string, string>("ReceiveMessage", ReceiveMessage);
_connection.On<CheckoutRequest>("Checkout", Checkout);
}
public Task Checkout(CheckoutRequest checkoutRequest)
{
//send Checkoutinfo back
CheckoutInfo checkoutInfo = new CheckoutInfo();
_connection.InvokeAsync("ReceiveCheckoutInfo", workerId, checkoutInfo);
return Task.CompletedTask;
}
}
请帮忙。谢谢
我认为最好的方法是跟踪 signalR 组中的连接。每次建立连接时,我们都需要根据建立连接的 workerId 对其进行分组。最好在 onConnectedAsync 方法中执行此操作,因为这样我们就不必在每次重置连接时都手动执行此操作。
但是我们如何知道在 onConnectedAsync 方法中连接的是哪个 worker?通过使用访问令牌,我在我的应用程序中知道哪个用户正在连接的方式相同。
不过要提到的一件事是,在使用此访问令牌时,SignalR 会在使用 websocket 进行连接时将其作为查询参数。如果您有 IIS 记录您的活动连接并且您认为 worker id 敏感,您可能希望也可能不希望这样做。 (使用长轮询或 SSE 时,访问令牌将在请求的 header 中发送)。
因此:
您可以在启动连接时将 worker id 作为访问令牌传递。
_connection = new HubConnectionBuilder()
.WithUrl("http://localhost:5106/chatHub", options =>
{
options.AccessTokenProvider = () => // pass worker id here;
})
.WithAutomaticReconnect()
.Build();
注意:您可以选择加密工作人员 ID 并在服务器端对其解密。
如果您不想将 workerId 与访问令牌相关联,那么您也可以将其作为硬编码查询参数传递。 (尽管这样做会保留它作为所有 3 种类型的 signalR 连接的查询参数)。
_connection = new HubConnectionBuilder()
.WithUrl($"http://localhost:5106/chatHub?workerId={workerId}")
.WithAutomaticReconnect()
.Build();
您也可以使用完全成熟的 JWT 令牌,如果愿意,也可以将 workerId 嵌入 JWT 令牌中。
下一步将是在 onConnectedAsync 方法中获取此 worker id。为此,我们需要:
- workerId中间件(获取workerId)
- workerId 服务(存储和访问 workerId)
- workerId 要求属性(强制某些集线器方法存在 workerId)
- WorkerId 中间件结果处理程序(如果 workerId 要求失败必须发生什么)
WorkerIdMiddleware 可以获取每个请求的 worker id 并将其存储在请求上下文中:
WorkerIdMiddleware.cs
public class WorkerIdMiddleware
{
private readonly RequestDelegate _next;
public WorkerIdMiddleware(RequestDelegate next)
{
_next = next;
}
public async Task Invoke(HttpContext httpContext)
{
var workerId = httpContext.Request.Query["access_token"];
if (!string.IsNullOrEmpty(workerId))
{
AttachWorkerIdToContext(httpContext, workerId);
}
await _next(httpContext);
}
private void AttachWorkerIdToContext(HttpContext httpContext, string workerId)
{
if (ValidWorkerId(workerId))
{
httpContext.Items["WorkerId"] = workerId;
}
}
private bool ValidWorkerId(string workerId)
{
// Validate the worker id if you need to
}
}
然后我们可以通过 WorkerIdService 访问 workerId:
WorkerIdService.cs
public class WorkerIdService
{
private string _currentWorkerId;
private readonly IHttpContextAccessor _httpContextAccessor;
public WorkerIdService(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
_currentWorkerId = GetCurrentWorkerIdFromHttpContext();
}
public string CurrentWorkerId
{
get
{
if (_currentWorkerId == null)
{
_currentWorkerId = GetCurrentWorkerIdFromHttpContext();
}
return _currentWorkerId;
}
}
private string GetCurrentWorkerIdFromHttpContext()
{
return (string)_httpContextAccessor.HttpContext?.Items?["WorkerId"];
}
}
workerId 需求和需求处理程序将使我们能够保护我们的 signalR 方法并确保在我们需要时传递 worker id:
ChatHubWorkerIdRequirement.cs
using Microsoft.AspNetCore.Authorization;
public class ChatHubWorkerIdRequirement : IAuthorizationRequirement
{
}
ChatHubWorkerIdHandler.cs
public class ChatHubWorkerIdHandler : AuthorizationHandler<ChatHubWorkerIdRequirement>
{
readonly IHttpContextAccessor _httpContextAccessor;
public ChatHubWorkerIdHandler(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
protected override Task HandleRequirementAsync(AuthorizationHandlerContext context, ChatHubWorkerIdRequirement requirement)
{
var workerId = (string)_httpContextAccessor.HttpContext.Items["WorkerId"];
if (workerId != null)
{
// Connection may proceed successfully
context.Succeed(requirement);
}
// Return completed task
return Task.CompletedTask;
}
}
为了自定义当workerId需求失败时响应的状态码,我们可以使用一个AuthorizationMiddlewareResultHandler
HubWorkerIdResponseHandler.cs
public class HubWorkerIdResponseHandler : IAuthorizationMiddlewareResultHandler
{
private readonly IAuthorizationMiddlewareResultHandler _handler;
public HubWorkerIdResponseHandler()
{
_handler = new AuthorizationMiddlewareResultHandler();
}
public async Task HandleAsync(
RequestDelegate requestDelegate,
HttpContext httpContext,
AuthorizationPolicy authorizationPolicy,
PolicyAuthorizationResult policyAuthorizationResult)
{
if (IsFailedPolicy(policyAuthorizationResult) && IsHubWorkerIdPolicy(authorizationPolicy))
{
// return whatever status code you wish if the hub is connected to without a worker id
httpContext.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
return;
}
await _handler.HandleAsync(requestDelegate, httpContext, authorizationPolicy, policyAuthorizationResult);
}
private static bool IsFailedPolicy(PolicyAuthorizationResult policyAuthorizationResult)
{
return !policyAuthorizationResult.Succeeded;
}
private static bool IsHubWorkerIdPolicy(AuthorizationPolicy authorizationPolicy)
{
return authorizationPolicy.Requirements.OfType<ChatHubWorkerIdRequirement>().Any();
}
}
最后,您需要像这样在您的启动中注册所有内容:
public void ConfigureServices(IServiceCollection services)
{
...
// Add the workerId policy
services.AddSingleton<IAuthorizationHandler, ChatHubWorkerIdHandler>();
services.AddAuthorization(options =>
{
options.AddPolicy("WorkerIdPolicy", policy =>
{
policy.Requirements.Add(new ChatHubWorkerIdRequirement());
});
});
// Hub Policy failure response handler (this will handle the failed requirement above)
services.AddSingleton<IAuthorizationMiddlewareResultHandler, HubWorkerIdResponseHandler>();
services.AddSignalR();
services.AddHttpContextAccessor();
services.AddScoped<IWorkerIdService, WorkerIdService>();
}
public void Configure(IApplicationBuilder app)
{
...
app.UseMiddleware<JwtMiddleware>();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
...
endpoints.MapHub<ChatHub>("ChatHub");
);
...
}
您现在可以使用我们创建的新授权策略属性装饰您的 ChatHub。通过装饰整个集线器 class,将在 onConnectedAsync 方法期间评估策略。
(如果您希望策略基于方法触发,则需要使用 workerId 策略属性装饰每个单独的方法)
[Authorize(Policy = "WorkerIdPolicy")]
public class ChatHub : Hub
{
....
}
然后您可以在 onConnectedAsync 方法期间从 WorkerIdService 访问 CurrentWorkerId:
public override async Task OnConnectedAsync()
{
await Groups.AddToGroupAsync(Context.ConnectionId, "SignalR Users");
// group the connections by workerId
await Groups.AddToGroupAsync(Context.ConnectionId, $"Worker-{_workerIdService.CurrentWorkerId}");
await base.OnConnectedAsync();
}
一切就绪后,您将能够使用 workerId 向该工作组发送信号,并且知道只有具有该 workerId 的 client/s 会收到它。