如何在 Controller 以外的地方调用 SignalR Clients.All.InvokeAsync()?

How to invoke SignalR Clients.All.InvokeAsync() in places other than the Controller?

我可以通过构造函数中的 DI 在我的 .NET Core WebAPI 的控制器中访问我的 IHubContext<MyHub> 很好和花花公子,但我也想在其他地方访问它。

具体来说,当我使用来自 RabbitMQ 的消息时,有时我想通过 _myHubContext.Clients.All.InvokeAsync() 更新客户端,但我就是不知道如何获取它。

我在查找有关在控制器之外执行此类操作的文档时也遇到了问题。

如有任何帮助,我们将不胜感激。

编辑:

要添加一些细节,以及我的问题可能源自何处,我正在尝试访问 Startup 中的 IHubContext(以及我在 ConfigureServices 中注册的一些自己的服务) class,特别是在 IApplicationLifetime ApplicationStartedApplicationStopped 期间调用 RabbitMQ 消费者的方法来连接和断开连接。

我的猜测是正确的,也许我无法访问 Startup class 中的注册服务?如果是这样,我将如何启动这些服务?

更新:

services.AddSignalR() 和一些在启动时调用的服务向上移动到 Program.cs 中的 WebHost.ConfigureServices 解决了我的一些问题,但当然还有更多问题。

当我收到来自 RabbitMQ 的消息时,我的 JS 客户端没有收到任何消息,但我的客户端已成功连接。 "Weird..."我想。为了获得更多信息,我在我的控制器中连接了一个 GET 操作以通过 SignalR Hub 发送一些内容。每当我调用 GET 时,它都会工作...... IHubContext<MyHub>。我通过我的 RabbitMQ 侦听器中的构造函数获得 hubContext,就像我对控制器所做的那样。

新问题:控制器中注入的东西与我在启动时自己注册的服务中注入的东西是否不同?怎么会这样,我该如何克服它?

一些代码...

摘自Program.cs

public static IWebHost BuildWebHost(string[] args) =>
    WebHost.CreateDefaultBuilder(args)
        .UseKestrel()
        .UseIISIntegration()
        .ConfigureServices(services => {
            services.AddSignalR();
            services.AddTransient<ISubscriber, Subscriber>();
            services.AddTransient<IDataService, DataService>();
            services.AddTransient<IHealthCheckProcessor, HealthCheckProcessor>();
            services.AddTransient<INodeProcessor, NodeProcessor>();
        })
        .UseStartup<Startup>()
        .Build();

来自Startup.cs

public class Startup
{
    public Startup(IConfiguration _configuration, ISubscriber _subscriber)
    {
        configuration = _configuration;
        subscriber = _subscriber;
    }
    public IConfiguration configuration { get; }
    public ISubscriber subscriber { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddCors();
        services.AddMvc();
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime applicationLifetime)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        app.UseCors(builder => builder
            // CORS stuff);

        app.UseSignalR(routes =>
        {
            routes.MapHub<StatusHub>("Status");
        });
        app.UseMvc();
        applicationLifetime.ApplicationStarted.Register(OnStartup);
        applicationLifetime.ApplicationStopping.Register(OnShutdown);
    }

    private void OnStartup() {
        // MessageBroker stuff
        subscriber.Start(messageBroker);
    }

    private void OnShutdown() {
        subscriber.Stop();
    }
}

来自Subscriber.cs

public class Subscriber : ISubscriber {
    public static IConnection connection;
    public static IModel channel;
    public IHubContext<StatusHub> hubContext;

    public static IHealthCheckProcessor healthCheckProcessor;
    public static INodeProcessor nodeProcessor;

    public Subscriber(IHubContext<StatusHub> _hubContext, INodeProcessor _nodeProcessor, IHealthCheckProcessor _healthCheckProcessor)
    {
        connection = new ConnectionFactory().CreateConnection();
        channel = connection.CreateModel();
        hubContext = _hubContext;
        nodeProcessor = _nodeProcessor;
        healthCheckProcessor = _healthCheckProcessor;
    }

    public void Start(MessageBroker messageBroker)
    {
        var factory = new ConnectionFactory() { HostName = messageBroker.URL }.CreateConnection();

        foreach (Queue queue in messageBroker.Queues)
        {
            channel.QueueDeclare(
                queue: queue.Name,
                durable: queue.Durable,
                exclusive: queue.Exclusive,
                autoDelete: queue.AutoDelete,
                arguments: null
            );

            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) =>
            {
                byte[] body = ea.Body;
                string message = Encoding.UTF8.GetString(body);

                RouteMessage(queue, message);
            };

            channel.BasicConsume(
                    queue: queue.Name,
                    autoAck: queue.AutoAck,
                    consumer: consumer
                );
            hubContext.Clients.All.InvokeAsync("Send", "It worked - from the subscriber");
        }
    }

    public void RouteMessage(Queue queue, string message) {
        if(queue.Name == "discovery") {
            nodeProcessor.Process(message);
        }
        if(queue.Name == "health") {
            healthCheckProcessor.Process(message);
        }
    }

    public void Stop()
    {
        Console.WriteLine("Terminating connection to RabbitMQ instance.");
        channel.Close(200, "Goodbye");
        connection.Close();
    }
}

来自HealthCheckProcessor.cs

public class HealthCheckProcessor : IHealthCheckProcessor {
    private IDataService dataService;
    private IHubContext<StatusHub> hubContext;

    public HealthCheckProcessor(IDataService _dataService, IHubContext<StatusHub> _hubContext)
    {
        dataService = _dataService;
        hubContext = _hubContext;
    }
    public void Process(string message) {
        HealthCheck health = JsonConvert.DeserializeObject<HealthCheck>(message);
        Node node = dataService.GetSingle(health.NodeId);
        node.Health = health;

        dataService.Update(node);
        Console.WriteLine("It's sending.");
        hubContext.Clients.All.InvokeAsync("Send", "It worked - from the processor");
    }
}

来自控制器:

[Route("api/[controller]")]
public class MyController: Controller
{
    private IDataService _dataService;
    private readonly IConfiguration configuration;
    private static IHubContext<StatusHub> hubContext;

    public NodesController(IConfiguration config, IDataService dataService, IHubContext<StatusHub> _hubContext)
    {
        _dataService = dataService;
        configuration = config;
        hubContext = _hubContext;
    }

    [HttpGet]
    public string Get()
    {
        hubContext.Clients.All.InvokeAsync("Send", "Blarg!");
        return "Well, I tried.";
    }
}

您正在尝试访问您请求时不可用的服务。

ConfigureConfigureServices 之后调用,以便可以访问任何已注册的服务。

public class Startup {
    public Startup(IConfiguration _configuration) {
        configuration = _configuration;
    }

    public IConfiguration configuration { get; }

    public void ConfigureServices(IServiceCollection services) {        
        services.AddCors();
        services.AddMvc();

        services.AddSignalR();
        services.AddTransient<ISubscriber, Subscriber>();
        services.AddTransient<IDataService, DataService>();
        services.AddTransient<IHealthCheckProcessor, HealthCheckProcessor>();
        services.AddTransient<INodeProcessor, NodeProcessor>();
    }

    public void Configure(
        IApplicationBuilder app, 
        IHostingEnvironment env, 
        IApplicationLifetime applicationLifetime, 
        IServiceProvider sp
    ) {
        if (env.IsDevelopment()) {
            app.UseDeveloperExceptionPage();
        }
        app.UseCors(builder => builder
            // CORS stuff);

        app.UseMvc();

        app.UseSignalR(routes => {
            routes.MapHub<StatusHub>("Status");
        });

        //At this point all the necessary dependencies have been registered and configured
        var subscriber = sp.GetService<ISubscriber>();

        applicationLifetime.ApplicationStarted.Register(() => OnStartup(subscriber));
        applicationLifetime.ApplicationStopping.Register(() => OnShutdown(subscriber));
    }

    private void OnStartup(ISubscriber subscriber) {
        // MessageBroker stuff
        subscriber.Start(messageBroker);
    }

    private void OnShutdown(ISubscriber subscriber) {
        subscriber.Stop();
    }
}

您现在应该可以在构建主机时删除便利 ConfigureServices

public static IWebHost BuildWebHost(string[] args) =>
    WebHost.CreateDefaultBuilder(args)
        .UseKestrel()
        .UseIISIntegration()
        .UseStartup<Startup>()
        .Build();