Blazor + SQLTableDependency + SignalR:从 OnChange 事件通知特定组
Blazor + SQLTableDependency + SignalR: Notify specific groups from OnChange event
我有一个 Blazor 应用程序,它使用 SQLTableDependency 来检测数据库更改,然后通过 SignalR 将更改通知所有客户端。这可行,但我需要一种能够检测更改并仅通知特定 SignalR 组的方法。因为 SQLTableDependency 不关心谁在数据库中插入、更改或删除了记录,所以我不确定如何知道哪个组也发送更新。请参阅下文以了解有关我的应用程序以及我正在努力完成的工作的更多详细信息。
我们为每个客户建立了一个新组织。一个组织有自己的资产列表,并且可以有多个用户。
Organization.cs
public class Organization
{
public int OrganizationId { get; set; }
public string OrganizationName { get; set; }
public List<Asset> Assets { get; set; }
public List<ApplicationUser> Users { get; set; }
public bool IsDisabled { get; set; }
}
Asset.cs
public class Asset
{
public int AssetId { get; set; }
public string SerialNumber { get; set; }
public int OrganizationId { get; set; }
public virtual Organization Organization { get; set; }
public DateTime DateAdded { get; set; }
}
ApplicationUser.cs
public class ApplicationUser
{
public string FirstName { get; set; }
public string LastName { get; set; }
public int OrganizationId { get; set; }
public virtual Organization Organization { get; set; }
public List<Connection> Connections { get; set; }
public string Timezone { get; set; }
}
Connection.cs - 我将每个 SignalR 连接存储在数据库中。
public class Connection
{
public string ConnectionId { get; set; }
public string UserName { get; set; }
public bool Connected { get; set; }
public string Group { get; set; }
public DateTime ConnectionTime { get; set; }
}
AssetService.cs
public class AssetService : IAssetService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public AssetService(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public async Task<Asset> AddAssetAsync(Asset asset, string currentUserName)
{
try
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetService<DataContext>();
if (asset.Device != null)
{
db.Entry(asset.Device).State = EntityState.Modified;
}
asset.DateAdded = DateTime.UtcNow;
await db.Assets.AddAsync(asset);
await db.SaveChangesAsync();
return asset;
}
}
catch (System.Exception ex)
{
throw ex;
}
}
}
AssetHub.cs - SignalR 中心
public class ChatHub : Hub
{
private readonly UserManager<ApplicationUser> _userManager;
private readonly IServiceScopeFactory _serviceScopeFactory;
public ChatHub(UserManager<ApplicationUser> userManager, IServiceScopeFactory serviceScopeFactory)
{
_userManager = userManager;
_serviceScopeFactory = serviceScopeFactory;
}
public async Task SendAssetToGroup(string userName, string location, Asset asset)
{
if (!string.IsNullOrWhiteSpace(userName))
{
var user = await _userManager.Users.Include(x => x.Connections).SingleAsync(x => x.UserName == userName);
if (user != null)
{
var group = $"{user.AccountId}-{location}";
await Clients.Group(group).SendAsync("AssetUpdate", user.Email, asset);
}
}
}
public override async Task OnConnectedAsync()
{
var httpContext = Context.GetHttpContext();
var location = httpContext.Request.Query["location"];
using (var scope = _serviceScopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetService<ApplicationDbContext>();
if (!string.IsNullOrWhiteSpace(userName))
{
var user = await db.Users.Include(x => x.Connections).SingleAsync(x => x.UserName == httpContext.User.Identity.Name);
if (user != null)
{
var group = $"{user.OrganizationId}-{location}";
var connection = new Connection { Connected = true, ConnectionId = Context.ConnectionId, Group = group, UserName = user.UserName };
await Groups.AddToGroupAsync(connection.ConnectionId, group);
user.Connections.Add(connection);
db.Users.Update(user);
}
}
await db.SaveChangesAsync();
}
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
if (!string.IsNullOrWhiteSpace(Context.ConnectionId))
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetService<ApplicationDbContext>();
var connection = await db.Connections.Where(x => x.ConnectionId ==
Context.ConnectionId).FirstOrDefaultAsync();
if (connection != null)
{
await Groups.RemoveFromGroupAsync(connection.ConnectionId, connection.Group);
db.Connections.Remove(connection);
await db.SaveChangesAsync();
}
}
}
await base.OnDisconnectedAsync(exception);
}
}
AssetTableChangeService.cs - 这是我需要帮助的地方。当 SQLTableDependency 检测到对资产 table 的更改时,我需要能够调用 AssetHub 中的 SendAssetToGroup 方法。由于用户属于不同的组织,我不想将更新推送到所有组织,我只想将更新发送给特定组织组中的用户。
public class AssetTableChangeService : IAssetTableChangeService
{
private const string TableName = "Assets";
private SqlTableDependency<Asset> _notifier;
private IConfiguration _configuration;
public event AssetChangeDelegate OnAssetChanged;
public StockTableChangeService(IConfiguration configuration)
{
_configuration = configuration;
// SqlTableDependency will trigger an event
// for any record change on monitored table
_notifier = new SqlTableDependency<Asset>(
_configuration.GetConnectionString("DefaultConnection"),
TableName);
_notifier.OnChanged += AssetChanged;
_notifier.Start();
}
private void AssetChanged(object sender, RecordChangedEventArgs<Asset> e)
{
OnAssetChanged.Invoke(this, new AssetChangeEventArgs(e.Entity, e.EntityOldValues));
}
public void Dispose()
{
_notifier.Stop();
_notifier.Dispose();
}
所以流程应该是这样的。
- 用户登录-通过SignalR建立连接
- 连接信息存储在数据库中。
- 根据用户连接的页面和 OrganizationId,将连接添加到 SignalR 组。
- 用户从 UI 创建新资产。
- 在资产服务中调用了 AddAsset 方法。
- 资产被插入到数据库中。
- SQLTableDependency 检测到更改,然后调用 AssetChanged 处理程序方法。
- AssetChanged 处理程序方法调用 OnAssetChanged 事件。
- AssetHub需要订阅OnAssetChanged事件
- 当触发 OnAssetChanged 事件时,AssetHub 中的处理程序方法需要调用 SendAssetToGroup 方法。
- 当用户从“资产”页面导航到另一个页面时,将从数据库中删除 SignalR 连接,并从组中删除该连接。
在第 9 步和第 10 步之前,我已经完成了所有工作。无论如何,是否可以使这成为可能,因为 SQLTableDependency 不关心谁进行了更改,所以我无法查找更新所在的连接组也需要推。有什么想法吗?
当 UI 与 class 一起工作时,例如:Student
UI 成员加入了名为“Student”或“BlahNamespace.Student”的组。
如果它只是一个名称列表,如果它是一个实体,则将名称和另一个组与 ID 作为字符串连接在一起
"BlahNamespace.Student:201" 在你的情况下,如果数据库从实体中知道这一点,你也可以附加组织的名称以获得更细的粒度。
服务器可以根据操作情况,根据需要通知群组。
我将集线器注入 API 控制器以实现此目的。
就我个人而言,我不会使用 signalr 服务来传输数据,保持它的轻量级,只是“发出”变化的信号。然后客户可以决定如何处理。这样一来,数据只能通过 API 以一种方式访问,并具有所有已配置的安全性。
我有一个 Blazor 应用程序,它使用 SQLTableDependency 来检测数据库更改,然后通过 SignalR 将更改通知所有客户端。这可行,但我需要一种能够检测更改并仅通知特定 SignalR 组的方法。因为 SQLTableDependency 不关心谁在数据库中插入、更改或删除了记录,所以我不确定如何知道哪个组也发送更新。请参阅下文以了解有关我的应用程序以及我正在努力完成的工作的更多详细信息。
我们为每个客户建立了一个新组织。一个组织有自己的资产列表,并且可以有多个用户。
Organization.cs
public class Organization
{
public int OrganizationId { get; set; }
public string OrganizationName { get; set; }
public List<Asset> Assets { get; set; }
public List<ApplicationUser> Users { get; set; }
public bool IsDisabled { get; set; }
}
Asset.cs
public class Asset
{
public int AssetId { get; set; }
public string SerialNumber { get; set; }
public int OrganizationId { get; set; }
public virtual Organization Organization { get; set; }
public DateTime DateAdded { get; set; }
}
ApplicationUser.cs
public class ApplicationUser
{
public string FirstName { get; set; }
public string LastName { get; set; }
public int OrganizationId { get; set; }
public virtual Organization Organization { get; set; }
public List<Connection> Connections { get; set; }
public string Timezone { get; set; }
}
Connection.cs - 我将每个 SignalR 连接存储在数据库中。
public class Connection
{
public string ConnectionId { get; set; }
public string UserName { get; set; }
public bool Connected { get; set; }
public string Group { get; set; }
public DateTime ConnectionTime { get; set; }
}
AssetService.cs
public class AssetService : IAssetService
{
private readonly IServiceScopeFactory _serviceScopeFactory;
public AssetService(IServiceScopeFactory serviceScopeFactory)
{
_serviceScopeFactory = serviceScopeFactory;
}
public async Task<Asset> AddAssetAsync(Asset asset, string currentUserName)
{
try
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetService<DataContext>();
if (asset.Device != null)
{
db.Entry(asset.Device).State = EntityState.Modified;
}
asset.DateAdded = DateTime.UtcNow;
await db.Assets.AddAsync(asset);
await db.SaveChangesAsync();
return asset;
}
}
catch (System.Exception ex)
{
throw ex;
}
}
}
AssetHub.cs - SignalR 中心
public class ChatHub : Hub
{
private readonly UserManager<ApplicationUser> _userManager;
private readonly IServiceScopeFactory _serviceScopeFactory;
public ChatHub(UserManager<ApplicationUser> userManager, IServiceScopeFactory serviceScopeFactory)
{
_userManager = userManager;
_serviceScopeFactory = serviceScopeFactory;
}
public async Task SendAssetToGroup(string userName, string location, Asset asset)
{
if (!string.IsNullOrWhiteSpace(userName))
{
var user = await _userManager.Users.Include(x => x.Connections).SingleAsync(x => x.UserName == userName);
if (user != null)
{
var group = $"{user.AccountId}-{location}";
await Clients.Group(group).SendAsync("AssetUpdate", user.Email, asset);
}
}
}
public override async Task OnConnectedAsync()
{
var httpContext = Context.GetHttpContext();
var location = httpContext.Request.Query["location"];
using (var scope = _serviceScopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetService<ApplicationDbContext>();
if (!string.IsNullOrWhiteSpace(userName))
{
var user = await db.Users.Include(x => x.Connections).SingleAsync(x => x.UserName == httpContext.User.Identity.Name);
if (user != null)
{
var group = $"{user.OrganizationId}-{location}";
var connection = new Connection { Connected = true, ConnectionId = Context.ConnectionId, Group = group, UserName = user.UserName };
await Groups.AddToGroupAsync(connection.ConnectionId, group);
user.Connections.Add(connection);
db.Users.Update(user);
}
}
await db.SaveChangesAsync();
}
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
if (!string.IsNullOrWhiteSpace(Context.ConnectionId))
{
using (var scope = _serviceScopeFactory.CreateScope())
{
var db = scope.ServiceProvider.GetService<ApplicationDbContext>();
var connection = await db.Connections.Where(x => x.ConnectionId ==
Context.ConnectionId).FirstOrDefaultAsync();
if (connection != null)
{
await Groups.RemoveFromGroupAsync(connection.ConnectionId, connection.Group);
db.Connections.Remove(connection);
await db.SaveChangesAsync();
}
}
}
await base.OnDisconnectedAsync(exception);
}
}
AssetTableChangeService.cs - 这是我需要帮助的地方。当 SQLTableDependency 检测到对资产 table 的更改时,我需要能够调用 AssetHub 中的 SendAssetToGroup 方法。由于用户属于不同的组织,我不想将更新推送到所有组织,我只想将更新发送给特定组织组中的用户。
public class AssetTableChangeService : IAssetTableChangeService
{
private const string TableName = "Assets";
private SqlTableDependency<Asset> _notifier;
private IConfiguration _configuration;
public event AssetChangeDelegate OnAssetChanged;
public StockTableChangeService(IConfiguration configuration)
{
_configuration = configuration;
// SqlTableDependency will trigger an event
// for any record change on monitored table
_notifier = new SqlTableDependency<Asset>(
_configuration.GetConnectionString("DefaultConnection"),
TableName);
_notifier.OnChanged += AssetChanged;
_notifier.Start();
}
private void AssetChanged(object sender, RecordChangedEventArgs<Asset> e)
{
OnAssetChanged.Invoke(this, new AssetChangeEventArgs(e.Entity, e.EntityOldValues));
}
public void Dispose()
{
_notifier.Stop();
_notifier.Dispose();
}
所以流程应该是这样的。
- 用户登录-通过SignalR建立连接
- 连接信息存储在数据库中。
- 根据用户连接的页面和 OrganizationId,将连接添加到 SignalR 组。
- 用户从 UI 创建新资产。
- 在资产服务中调用了 AddAsset 方法。
- 资产被插入到数据库中。
- SQLTableDependency 检测到更改,然后调用 AssetChanged 处理程序方法。
- AssetChanged 处理程序方法调用 OnAssetChanged 事件。
- AssetHub需要订阅OnAssetChanged事件
- 当触发 OnAssetChanged 事件时,AssetHub 中的处理程序方法需要调用 SendAssetToGroup 方法。
- 当用户从“资产”页面导航到另一个页面时,将从数据库中删除 SignalR 连接,并从组中删除该连接。
在第 9 步和第 10 步之前,我已经完成了所有工作。无论如何,是否可以使这成为可能,因为 SQLTableDependency 不关心谁进行了更改,所以我无法查找更新所在的连接组也需要推。有什么想法吗?
当 UI 与 class 一起工作时,例如:Student
UI 成员加入了名为“Student”或“BlahNamespace.Student”的组。 如果它只是一个名称列表,如果它是一个实体,则将名称和另一个组与 ID 作为字符串连接在一起 "BlahNamespace.Student:201" 在你的情况下,如果数据库从实体中知道这一点,你也可以附加组织的名称以获得更细的粒度。
服务器可以根据操作情况,根据需要通知群组。
我将集线器注入 API 控制器以实现此目的。
就我个人而言,我不会使用 signalr 服务来传输数据,保持它的轻量级,只是“发出”变化的信号。然后客户可以决定如何处理。这样一来,数据只能通过 API 以一种方式访问,并具有所有已配置的安全性。