MassTransit 5.2,SignalR:如何在我的消费者中获取 IHubContext?
MassTransit 5.2, SignalR: How can i get the IHubContext inside my Consumer?
我的主要问题是获取正确的 SignalR 集线器实例。
上下文:我正在构建一个与几个外部系统通信的网络应用程序。我的应用程序中的 CRUD 操作导致更新外部系统的数据库。
在这个例子中我有 3 个服务 运行:
外部系统 |状态机 | .NET 核心网络API
当我 post 'create employee' 表单时,RabbitMQ 消息将从 WebAPI 发送到状态机。然后,状态机向更新数据库的外部系统服务发送几条创建消息。此后,它更新状态机以跟踪创建操作。
表单 -> API -> 状态机 -> 外部系统 -> 状态机 -> API
到目前为止一切顺利。现在我想使用 SignalR 将状态更新发送到客户端。所以我在 API:
中实现了这个消费者
public class UpdatesConsumer :
IConsumer<IExternalSystemUpdateMessage>
{
private readonly IHubContext<UpdatesHub> _updaterHubContext;
public UpdatesConsumer(IHubContext<UpdatesHub> hubContext)
{
_updaterHubContext = hubContext;
}
public Task Consume(ConsumeContext<IExternalSystemUpdateMessage> context)
{
//return _updaterHubContext.Clients.Group(context.Message.CorrelationId.ToString()).SendAsync("SEND_UPDATE", context.Message.Message);// this.SendUpdate(context.Message.CorrelationId, context.Message.Message);
return _updaterHubContext.Clients.All.SendAsync("SEND_UPDATE", context.Message.Message);
}
}
这是我的 SignalR 中心:
public class UpdatesHub :
Hub
{
public Task SendUpdate(Guid correlationId, string message)
{
return Clients.Group(correlationId.ToString()).SendAsync("SEND_UPDATE", message);
}
}
这就是总线和消费者的实例化方式:
public void ConfigureServices(IServiceCollection services)
{
_services = services;
services.AddMvc();
services.AddSignalR();
//services.AddSingleton<IHubContext<UpdatesHub>>();
WebAPI.CreateBus();
}
public static IServiceCollection _services;
static IBusControl _busControl;
public static IBusControl Bus
{
get
{
return _busControl;
}
}
public static void CreateBus()
{
IRMQConnection rmqSettings = Config.GetRMQConnectionConfig("rmq-settings.json", "connection");
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(BusInitializer.GetUri("", rmqSettings), h =>
{
h.Username(rmqSettings.UserName);
h.Password(rmqSettings.Password);
});
x.ReceiveEndpoint(host, "externalsystems.update",
e => { e.Consumer(() => new UpdatesConsumer((IHubContext<UpdatesHub>)Startup.__serviceProvider.GetService(typeof(IHubContext<UpdatesHub>)))); });
});
TaskUtil.Await(() => _busControl.StartAsync());
}
============================================= ============================
所以问题是我的 Consumer class 中的 _updaterHubContext.Clients 总是空的。我测试了访问控制器中的集线器,客户端确实出现了:
public class TestController : Controller
{
private readonly IHubContext<UpdatesHub> _hubContext;
public TestController(IHubContext<UpdatesHub> hubContext)
{
_hubContext = hubContext;
}
[HttpGet]
[Route("api/Test/")]
public IActionResult Index()
{
return View();
}
}
如何在我的消费者 class 中获得正确的集线器实例?或者我如何访问 .net 正在使用的 IServiceCollection?
提前致谢!
为什么不使用 Microsoft Dependency Injection 注册总线。它应该可以解决您的问题,它将使用 IServiceProvider
解决您的消费者问题
您可以注册您的消费者,以便 MassTransit 使用 MassTransit.Extensions.DependencyInjection 包中提供的支持从 IServiceProvider
解决它。
x.ReceiveEndpoint(host, "externalsystems.update", e =>
{
e.Consumer<UpdatesConsumer>(_serviceProvider);
});
一定要在容器中注册您的 UpdatesConsumer
。这应该为端点上收到的每条消息解析一个新的消费者实例。
我的主要问题是获取正确的 SignalR 集线器实例。
上下文:我正在构建一个与几个外部系统通信的网络应用程序。我的应用程序中的 CRUD 操作导致更新外部系统的数据库。
在这个例子中我有 3 个服务 运行:
外部系统 |状态机 | .NET 核心网络API
当我 post 'create employee' 表单时,RabbitMQ 消息将从 WebAPI 发送到状态机。然后,状态机向更新数据库的外部系统服务发送几条创建消息。此后,它更新状态机以跟踪创建操作。
表单 -> API -> 状态机 -> 外部系统 -> 状态机 -> API
到目前为止一切顺利。现在我想使用 SignalR 将状态更新发送到客户端。所以我在 API:
中实现了这个消费者public class UpdatesConsumer :
IConsumer<IExternalSystemUpdateMessage>
{
private readonly IHubContext<UpdatesHub> _updaterHubContext;
public UpdatesConsumer(IHubContext<UpdatesHub> hubContext)
{
_updaterHubContext = hubContext;
}
public Task Consume(ConsumeContext<IExternalSystemUpdateMessage> context)
{
//return _updaterHubContext.Clients.Group(context.Message.CorrelationId.ToString()).SendAsync("SEND_UPDATE", context.Message.Message);// this.SendUpdate(context.Message.CorrelationId, context.Message.Message);
return _updaterHubContext.Clients.All.SendAsync("SEND_UPDATE", context.Message.Message);
}
}
这是我的 SignalR 中心:
public class UpdatesHub :
Hub
{
public Task SendUpdate(Guid correlationId, string message)
{
return Clients.Group(correlationId.ToString()).SendAsync("SEND_UPDATE", message);
}
}
这就是总线和消费者的实例化方式:
public void ConfigureServices(IServiceCollection services)
{
_services = services;
services.AddMvc();
services.AddSignalR();
//services.AddSingleton<IHubContext<UpdatesHub>>();
WebAPI.CreateBus();
}
public static IServiceCollection _services;
static IBusControl _busControl;
public static IBusControl Bus
{
get
{
return _busControl;
}
}
public static void CreateBus()
{
IRMQConnection rmqSettings = Config.GetRMQConnectionConfig("rmq-settings.json", "connection");
_busControl = MassTransit.Bus.Factory.CreateUsingRabbitMq(x =>
{
var host = x.Host(BusInitializer.GetUri("", rmqSettings), h =>
{
h.Username(rmqSettings.UserName);
h.Password(rmqSettings.Password);
});
x.ReceiveEndpoint(host, "externalsystems.update",
e => { e.Consumer(() => new UpdatesConsumer((IHubContext<UpdatesHub>)Startup.__serviceProvider.GetService(typeof(IHubContext<UpdatesHub>)))); });
});
TaskUtil.Await(() => _busControl.StartAsync());
}
============================================= ============================
所以问题是我的 Consumer class 中的 _updaterHubContext.Clients 总是空的。我测试了访问控制器中的集线器,客户端确实出现了:
public class TestController : Controller
{
private readonly IHubContext<UpdatesHub> _hubContext;
public TestController(IHubContext<UpdatesHub> hubContext)
{
_hubContext = hubContext;
}
[HttpGet]
[Route("api/Test/")]
public IActionResult Index()
{
return View();
}
}
如何在我的消费者 class 中获得正确的集线器实例?或者我如何访问 .net 正在使用的 IServiceCollection?
提前致谢!
为什么不使用 Microsoft Dependency Injection 注册总线。它应该可以解决您的问题,它将使用 IServiceProvider
您可以注册您的消费者,以便 MassTransit 使用 MassTransit.Extensions.DependencyInjection 包中提供的支持从 IServiceProvider
解决它。
x.ReceiveEndpoint(host, "externalsystems.update", e =>
{
e.Consumer<UpdatesConsumer>(_serviceProvider);
});
一定要在容器中注册您的 UpdatesConsumer
。这应该为端点上收到的每条消息解析一个新的消费者实例。