Azure WebJob 不向 Azure SignalR 服务发送 SignalR 消息
Azure WebJob not sending SignalR message to Azure SignalR service
我有一个 Asp.Net Core Api 3.1 和一个 Azure WebJob,两者都 运行 在 Azure 应用服务上运行。他们都需要发送通知并且都不会接收消息。分解:
我在每个应用程序指向的云中有一个 Azure SignalR 服务实例。
我创建的 Hub class 位于 Api 和 WebJob 项目都引用的库中。
客户端将仅通过 Api.
连接到集线器
当 Api 向连接发送消息时一切正常,但 WebJob 没有。
我真的不想 运行 WebJob 作为客户端,因为那样我就必须处理身份验证。我只是希望它 运行 作为同一个集线器的另一个实例,它将消息发送到 API 所做的相同组。此外,此 WebJob 不适合 运行 作为 Azure 函数,因为它花费的时间太长。
我在配置 WebJob 时遗漏了一些东西,因为它似乎没有连接到 Azure SignalR。
当 WebJob 尝试发送消息时,出现以下错误:(我还没有发布到 Azure,所以这一切都发生在我的本地机器上)
Microsoft.Azure.SignalR.ServiceLifetimeManager[100]
无法发送消息(空)。
Microsoft.Azure.SignalR.Common.AzureSignalRNotConnectedException: Azure SignalR 服务尚未连接,请稍后重试。
在 Microsoft.Azure.SignalR.ServiceConnectionManager1.WriteAsync(ServiceMessage serviceMessage) at Microsoft.Azure.SignalR.ServiceLifetimeManagerBase
1.<>c__DisplayClass22_01.<WriteAsync>b__0(T m) at Microsoft.Azure.SignalR.ServiceLifetimeManagerBase
1.WriteCoreAsync[T](T 消息,Func`2 任务)
WebJob Main:
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MyProject.Data;
using MyProject.Hub;
using Microsoft.Azure.KeyVault;
using Microsoft.Azure.Services.AppAuthentication;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration.AzureKeyVault;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public static IConfigurationRoot Configuration { get; protected set; }
static async Task Main(string[] args)
{
string buildConfig = "Development";
var builder = new HostBuilder()
.ConfigureWebJobs(b =>
{
b.AddAzureStorageCoreServices();
b.AddTimers();
})
.ConfigureAppConfiguration(config =>
{
config.AddJsonFile($"appsettings.{buildConfig}.json", true, true);
Configuration = config.Build();
})
.ConfigureServices((hostContext, services) =>
{
services.AddSignalR().AddAzureSignalR(options =>
{
options.ConnectionString = Configuration["Azure:SignalR:ConnectionString"];
});
services.AddHostedService<ApplicationHostService>();
services.AddDbContext<MyDbContext>(options => options.UseSqlServer(Configuration.GetConnectionString("MyDatabase")));
services.AddScoped<MyDbContext>();
**// The NotificationService expects an injected MyDbContext and IHubContext<NotificationHub>**
services.AddScoped<INotificationService, NotificationService>();
})
.ConfigureLogging((context, b) =>
{
b.AddConsole();
});
var host = builder.Build();
using (host)
{
host.Run();
}
}
}
ApplicationHostService:
public class ApplicationHostService : IHostedService
{
readonly ILogger<ApplicationHostService> _logger;
readonly IConfiguration _configuration;
readonly IHostingEnvironment _hostingEnvironment;
public ApplicationHostService(
ILogger<ApplicationHostService> logger,
IConfiguration configuration,
IHostingEnvironment hostingEnvironment
)
{
_logger = logger;
_configuration = configuration;
_hostingEnvironment = hostingEnvironment;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await Task.CompletedTask;
_logger.LogWarning("Application Host Service started.....");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogWarning("Application Host Service stopped.....");
await Task.CompletedTask;
}
}
WebJob 触发代码:
public class MyImportTrigger
{
private INotificationService _notificationService;
public MyImportTrigger(INotificationService notificationService)
{
_notificationService = notificationService;
}
public async Task Run([TimerTrigger("0 */1 * * * *" )] TimerInfo myTimer, ILogger log)
{
....bunch of non-relevant removed code for brevity....
await _notificationService.CreateFundImportNotificationAsync(upload);
....bunch of non-relevant removed code for brevity....
}
}
通知服务:
using ProjectName.Auth;
using ProjectName.Data;
using ProjectName.Utils;
using Microsoft.AspNetCore.SignalR;
using SignalR.Mvc;
using System;
using System.ComponentModel;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
namespace ProjectName.Hub
{
public interface INotificationService
{
FundPublishNotification CreateFundPublishNotification(short quarter, short year, string userName);
}
public class NotificationService : INotificationService
{
MyDbContext _context;
IHubContext<NotificationHub> _hubContext;
public NotificationService(MyDbContext context, IHubContext<NotificationHub> hubContext)
{
_context = context;
_hubContext = hubContext;
}
public FundPublishNotification CreateFundPublishNotification(short quarter, short year, string userName)
{
*** removed: do some processing and db persistence to create an object called "notif" ***
**** THIS IS WHERE IT BOMBS WHEN CALLED FROM WEBJOB, BUT DOESN'T WHEN CALLED FROM API******
** the roles value is retrieved from the db and isn't dependent on a current user **
_hubContext.Clients.Groups(roles).SendAsync("NewNotification", notif);
return notif;
}
}
}
枢纽class:
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
using MyProject.Auth;
namespace SignalR.Mvc
{
[Authorize]
public class NotificationHub : Hub
{
public NotificationHub()
{ }
public override async Task OnConnectedAsync()
{
await AddConnectionToGroups();
await base.OnConnectedAsync();
}
public async Task AddConnectionToGroups()
{
var roles = Context.User.Roles();
foreach (RoleValues role in roles)
{
await Groups.AddToGroupAsync(Context.ConnectionId, role.ToString());
}
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await RemoveConnectionToGroups();
await base.OnDisconnectedAsync(exception);
}
public async Task RemoveConnectionToGroups()
{
var roles = Context.User.Roles();
foreach (RoleValues role in roles)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, role.ToString());
}
}
}
}
appsettings.json:
{
"Azure": {
"SignalR": {
"ConnectionString": "Endpoint=https://myproject-signalr-dev.service.signalr.net;AccessKey=removed-value-before-posting;Version=1.0;"
}
}
}
包版本:
<PackageReference Include="Azure.Security.KeyVault.Keys" Version="4.1.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.6.0" />
<PackageReference Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.SignalR" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.SignalR.Management" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.23" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.SignalRService" Version="1.2.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.AzureKeyVault" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.8" />
好吧,我想通了,所以在这里发布了解决方案的相关部分。
使用通过以下包提供的 ServiceManagerBuilder:
using Microsoft.Azure.SignalR.Management;
运行 任务:
public class ImportTrigger
{
private INotificationService _notificationService;
private IConfiguration _config;
private ILogger<ImportTrigger> _logger;
public ImportTrigger(INotificationService notificationService, IConfiguration configuration, ILoggerFactory loggerFactory)
{
_notificationService = notificationService;
_config = configuration;
_logger = loggerFactory.CreateLogger<ImportTrigger>();
}
public async Task Run([TimerTrigger("%CRONTIME%" )] TimerInfo myTimer)
{
... bunch of removed code for brevity ...
try
{
var (importNotif, roles) = _notificationService.CreateFundImportNotificationAsync(upload);
using (var hubServiceManager = new ServiceManagerBuilder().WithOptions(option =>
{
option.ConnectionString = _config["Azure:SignalR:ConnectionString"];
option.ServiceTransportType = ServiceTransportType.Persistent;
}).Build())
{
var hubContext = await hubServiceManager.CreateHubContextAsync("NotificationHub");
await hubContext.Clients.Groups(roles.Select(r => r.ToString()).ToImmutableList<string>()).SendAsync("NewNotification", importNotif.ToModel());
}
}
catch { }
... bunch of removed code for brevity ...
}
我有一个 Asp.Net Core Api 3.1 和一个 Azure WebJob,两者都 运行 在 Azure 应用服务上运行。他们都需要发送通知并且都不会接收消息。分解:
我在每个应用程序指向的云中有一个 Azure SignalR 服务实例。
我创建的 Hub class 位于 Api 和 WebJob 项目都引用的库中。
客户端将仅通过 Api.
连接到集线器当 Api 向连接发送消息时一切正常,但 WebJob 没有。
我真的不想 运行 WebJob 作为客户端,因为那样我就必须处理身份验证。我只是希望它 运行 作为同一个集线器的另一个实例,它将消息发送到 API 所做的相同组。此外,此 WebJob 不适合 运行 作为 Azure 函数,因为它花费的时间太长。
我在配置 WebJob 时遗漏了一些东西,因为它似乎没有连接到 Azure SignalR。
当 WebJob 尝试发送消息时,出现以下错误:(我还没有发布到 Azure,所以这一切都发生在我的本地机器上)
Microsoft.Azure.SignalR.ServiceLifetimeManager[100]
无法发送消息(空)。
Microsoft.Azure.SignalR.Common.AzureSignalRNotConnectedException: Azure SignalR 服务尚未连接,请稍后重试。
在 Microsoft.Azure.SignalR.ServiceConnectionManager1.WriteAsync(ServiceMessage serviceMessage) at Microsoft.Azure.SignalR.ServiceLifetimeManagerBase
1.<>c__DisplayClass22_01.<WriteAsync>b__0(T m) at Microsoft.Azure.SignalR.ServiceLifetimeManagerBase
1.WriteCoreAsync[T](T 消息,Func`2 任务)
WebJob Main:
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MyProject.Data;
using MyProject.Hub;
using Microsoft.Azure.KeyVault;
using Microsoft.Azure.Services.AppAuthentication;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Configuration.AzureKeyVault;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public static IConfigurationRoot Configuration { get; protected set; }
static async Task Main(string[] args)
{
string buildConfig = "Development";
var builder = new HostBuilder()
.ConfigureWebJobs(b =>
{
b.AddAzureStorageCoreServices();
b.AddTimers();
})
.ConfigureAppConfiguration(config =>
{
config.AddJsonFile($"appsettings.{buildConfig}.json", true, true);
Configuration = config.Build();
})
.ConfigureServices((hostContext, services) =>
{
services.AddSignalR().AddAzureSignalR(options =>
{
options.ConnectionString = Configuration["Azure:SignalR:ConnectionString"];
});
services.AddHostedService<ApplicationHostService>();
services.AddDbContext<MyDbContext>(options => options.UseSqlServer(Configuration.GetConnectionString("MyDatabase")));
services.AddScoped<MyDbContext>();
**// The NotificationService expects an injected MyDbContext and IHubContext<NotificationHub>**
services.AddScoped<INotificationService, NotificationService>();
})
.ConfigureLogging((context, b) =>
{
b.AddConsole();
});
var host = builder.Build();
using (host)
{
host.Run();
}
}
}
ApplicationHostService:
public class ApplicationHostService : IHostedService
{
readonly ILogger<ApplicationHostService> _logger;
readonly IConfiguration _configuration;
readonly IHostingEnvironment _hostingEnvironment;
public ApplicationHostService(
ILogger<ApplicationHostService> logger,
IConfiguration configuration,
IHostingEnvironment hostingEnvironment
)
{
_logger = logger;
_configuration = configuration;
_hostingEnvironment = hostingEnvironment;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await Task.CompletedTask;
_logger.LogWarning("Application Host Service started.....");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogWarning("Application Host Service stopped.....");
await Task.CompletedTask;
}
}
WebJob 触发代码:
public class MyImportTrigger
{
private INotificationService _notificationService;
public MyImportTrigger(INotificationService notificationService)
{
_notificationService = notificationService;
}
public async Task Run([TimerTrigger("0 */1 * * * *" )] TimerInfo myTimer, ILogger log)
{
....bunch of non-relevant removed code for brevity....
await _notificationService.CreateFundImportNotificationAsync(upload);
....bunch of non-relevant removed code for brevity....
}
}
通知服务:
using ProjectName.Auth;
using ProjectName.Data;
using ProjectName.Utils;
using Microsoft.AspNetCore.SignalR;
using SignalR.Mvc;
using System;
using System.ComponentModel;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
namespace ProjectName.Hub
{
public interface INotificationService
{
FundPublishNotification CreateFundPublishNotification(short quarter, short year, string userName);
}
public class NotificationService : INotificationService
{
MyDbContext _context;
IHubContext<NotificationHub> _hubContext;
public NotificationService(MyDbContext context, IHubContext<NotificationHub> hubContext)
{
_context = context;
_hubContext = hubContext;
}
public FundPublishNotification CreateFundPublishNotification(short quarter, short year, string userName)
{
*** removed: do some processing and db persistence to create an object called "notif" ***
**** THIS IS WHERE IT BOMBS WHEN CALLED FROM WEBJOB, BUT DOESN'T WHEN CALLED FROM API******
** the roles value is retrieved from the db and isn't dependent on a current user **
_hubContext.Clients.Groups(roles).SendAsync("NewNotification", notif);
return notif;
}
}
}
枢纽class:
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.SignalR;
using MyProject.Auth;
namespace SignalR.Mvc
{
[Authorize]
public class NotificationHub : Hub
{
public NotificationHub()
{ }
public override async Task OnConnectedAsync()
{
await AddConnectionToGroups();
await base.OnConnectedAsync();
}
public async Task AddConnectionToGroups()
{
var roles = Context.User.Roles();
foreach (RoleValues role in roles)
{
await Groups.AddToGroupAsync(Context.ConnectionId, role.ToString());
}
}
public override async Task OnDisconnectedAsync(Exception exception)
{
await RemoveConnectionToGroups();
await base.OnDisconnectedAsync(exception);
}
public async Task RemoveConnectionToGroups()
{
var roles = Context.User.Roles();
foreach (RoleValues role in roles)
{
await Groups.RemoveFromGroupAsync(Context.ConnectionId, role.ToString());
}
}
}
}
appsettings.json:
{
"Azure": {
"SignalR": {
"ConnectionString": "Endpoint=https://myproject-signalr-dev.service.signalr.net;AccessKey=removed-value-before-posting;Version=1.0;"
}
}
}
包版本:
<PackageReference Include="Azure.Security.KeyVault.Keys" Version="4.1.0" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.6.0" />
<PackageReference Include="Microsoft.AspNetCore.SignalR" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.SignalR" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.SignalR.Management" Version="1.6.0" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.23" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="4.0.1" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Http" Version="3.0.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.SignalRService" Version="1.2.2" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.AzureKeyVault" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.8" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.8" />
好吧,我想通了,所以在这里发布了解决方案的相关部分。
使用通过以下包提供的 ServiceManagerBuilder:
using Microsoft.Azure.SignalR.Management;
运行 任务:
public class ImportTrigger
{
private INotificationService _notificationService;
private IConfiguration _config;
private ILogger<ImportTrigger> _logger;
public ImportTrigger(INotificationService notificationService, IConfiguration configuration, ILoggerFactory loggerFactory)
{
_notificationService = notificationService;
_config = configuration;
_logger = loggerFactory.CreateLogger<ImportTrigger>();
}
public async Task Run([TimerTrigger("%CRONTIME%" )] TimerInfo myTimer)
{
... bunch of removed code for brevity ...
try
{
var (importNotif, roles) = _notificationService.CreateFundImportNotificationAsync(upload);
using (var hubServiceManager = new ServiceManagerBuilder().WithOptions(option =>
{
option.ConnectionString = _config["Azure:SignalR:ConnectionString"];
option.ServiceTransportType = ServiceTransportType.Persistent;
}).Build())
{
var hubContext = await hubServiceManager.CreateHubContextAsync("NotificationHub");
await hubContext.Clients.Groups(roles.Select(r => r.ToString()).ToImmutableList<string>()).SendAsync("NewNotification", importNotif.ToModel());
}
}
catch { }
... bunch of removed code for brevity ...
}