Mass Transit - 在 Web App 中生成消息并在 Azure Function 中使用
Mass Transit - Produce messages in Web App and consume in Azure Function
我有一个通过 Azure 服务总线发送消息并等待回复的 Web 应用程序:
var response = await this.createClient.GetResponse<Models.Response<CommandResponse>>(message, cancellationToken);
我有一个 Azure 函数和一个使用消息的服务总线触发器:
[FunctionName("CommandHandler")]
public Task HandleCommandAsync(
[ServiceBusTrigger("input-queue", Connection = "AzureWebJobsServiceBus"), ServiceBusAccount("ServiceBus")]
Message message,
IBinder binder,
ILogger logger,
CancellationToken cancellationToken)
{
logger.LogInformation("Command Handler Function Invoked.");
var result = System.Text.Encoding.UTF8.GetString(message.Body);
var d = JsonConvert.DeserializeObject<dynamic>(result);
string messageType = d.message.messageType.Value;
var handler = Bus.Factory.CreateBrokeredMessageReceiver(
binder,
cfg =>
{
cfg.CancellationToken = cancellationToken;
cfg.SetLog(logger);
cfg.InputAddress = new Uri($"{this.secrets.Value.ServiceBusUri}/input-queue");
cfg.UseRetry(x => x.Intervals(10, 100, 500, 1000));
cfg.Consumer(() => this.customerConsumer);
});
var handlerResult = handler.Handle(message);
logger.LogInformation("Command Handler Function Completed.");
return handlerResult;
}
为了从网络应用程序发送消息,我必须配置消费者。我不确定为什么这是必要的,因为 Web 应用程序永远不需要直接引用消费者,但如果没有以下代码,则不会发送任何消息。
services.AddMassTransit(
x =>
{
x.AddConsumer<CustomerConsumer>();
x.AddBus(
provider => Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
var host = cfg.Host(secrets.AzureWebJobsServiceBus, h => { });
cfg.ReceiveEndpoint(
host,
"input-queue",
ep =>
{
ep.ConfigureConsumer<CustomerConsumer>(provider);
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
});
}));
x.AddRequestClient<RegisterNewCustomerCommand>();
});
问题是消息发送后,有时 CommandHandler
函数应用会触发并调用消费者的Handle
消息。
但是 有时 Web 应用程序会无意中直接调用消费者(因为消费者在启动时注册并侦听队列中的消息)。
这对我来说是不希望的。对于可伸缩性,只有 Azure Functions 应该调用消费者。此外,消费者注入的依赖项仅在函数应用程序的 WebHostStartUp
中注册,因此如果直接从 Web 应用程序调用,消费者会出错。
问题:如何打破 Web 应用程序和消费者之间的依赖关系,使它们永远不会被 Web 应用程序直接调用,而总是被函数应用程序触发器调用?有什么方法可以避免 Adding/Configuring Web App 的 StartUp 方法中的消费者,同时仍然允许 Web App 通过 GetResponse
发送消息?
更新 - 解决方案
ASP.NET Core 2.2 Startup 不需要对消费者有任何引用(除非与我不同,您希望您的 Web 应用程序也使用消息)。
您需要添加服务总线 URL,包括队列路径到您的 AddRequestClient
。查看已接受的答案。
这对我有用。
services.AddMassTransit(
x =>
{
x.AddBus(
provider => Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
cfg.Host(
secrets.AzureWebJobsServiceBus,
h => { h.TransportType = TransportType.Amqp; });
}));
var serviceBusUri = new Uri($"{settings.ServiceBusUri}/input-queue");
x.AddRequestClient<RegisterNewCustomerCommand>
});
}
您的客户端不需要消费者 API。我唯一能想到的是你没有正确设置拓扑。
- 事先确保队列存在,因为函数不会创建队列。
- 配置队列的 URI 地址,以便您的网站 API 可以在
AddRequestClient
调用中指定它作为参数 (_sb://host..../input-queue ).
- 盈利!
由于您配置请求客户端的方式,很可能它使用的是发布而不是发送,因为它不知道队列地址。在这种情况下,将消费者添加到您的网络 API 对该主题进行了适当的订阅,以将消息转发到队列。您可以使用 Service Bus Explorer 之类的东西来查看它是如何在云中布局的。
将请求客户端的 URI 添加到客户端应用程序应该可以解决问题,并直接将命令发送到队列。
我有一个通过 Azure 服务总线发送消息并等待回复的 Web 应用程序:
var response = await this.createClient.GetResponse<Models.Response<CommandResponse>>(message, cancellationToken);
我有一个 Azure 函数和一个使用消息的服务总线触发器:
[FunctionName("CommandHandler")]
public Task HandleCommandAsync(
[ServiceBusTrigger("input-queue", Connection = "AzureWebJobsServiceBus"), ServiceBusAccount("ServiceBus")]
Message message,
IBinder binder,
ILogger logger,
CancellationToken cancellationToken)
{
logger.LogInformation("Command Handler Function Invoked.");
var result = System.Text.Encoding.UTF8.GetString(message.Body);
var d = JsonConvert.DeserializeObject<dynamic>(result);
string messageType = d.message.messageType.Value;
var handler = Bus.Factory.CreateBrokeredMessageReceiver(
binder,
cfg =>
{
cfg.CancellationToken = cancellationToken;
cfg.SetLog(logger);
cfg.InputAddress = new Uri($"{this.secrets.Value.ServiceBusUri}/input-queue");
cfg.UseRetry(x => x.Intervals(10, 100, 500, 1000));
cfg.Consumer(() => this.customerConsumer);
});
var handlerResult = handler.Handle(message);
logger.LogInformation("Command Handler Function Completed.");
return handlerResult;
}
为了从网络应用程序发送消息,我必须配置消费者。我不确定为什么这是必要的,因为 Web 应用程序永远不需要直接引用消费者,但如果没有以下代码,则不会发送任何消息。
services.AddMassTransit(
x =>
{
x.AddConsumer<CustomerConsumer>();
x.AddBus(
provider => Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
var host = cfg.Host(secrets.AzureWebJobsServiceBus, h => { });
cfg.ReceiveEndpoint(
host,
"input-queue",
ep =>
{
ep.ConfigureConsumer<CustomerConsumer>(provider);
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
});
}));
x.AddRequestClient<RegisterNewCustomerCommand>();
});
问题是消息发送后,有时 CommandHandler
函数应用会触发并调用消费者的Handle
消息。
但是 有时 Web 应用程序会无意中直接调用消费者(因为消费者在启动时注册并侦听队列中的消息)。
这对我来说是不希望的。对于可伸缩性,只有 Azure Functions 应该调用消费者。此外,消费者注入的依赖项仅在函数应用程序的 WebHostStartUp
中注册,因此如果直接从 Web 应用程序调用,消费者会出错。
问题:如何打破 Web 应用程序和消费者之间的依赖关系,使它们永远不会被 Web 应用程序直接调用,而总是被函数应用程序触发器调用?有什么方法可以避免 Adding/Configuring Web App 的 StartUp 方法中的消费者,同时仍然允许 Web App 通过 GetResponse
发送消息?
更新 - 解决方案
ASP.NET Core 2.2 Startup 不需要对消费者有任何引用(除非与我不同,您希望您的 Web 应用程序也使用消息)。
您需要添加服务总线 URL,包括队列路径到您的 AddRequestClient
。查看已接受的答案。
这对我有用。
services.AddMassTransit(
x =>
{
x.AddBus(
provider => Bus.Factory.CreateUsingAzureServiceBus(
cfg =>
{
cfg.Host(
secrets.AzureWebJobsServiceBus,
h => { h.TransportType = TransportType.Amqp; });
}));
var serviceBusUri = new Uri($"{settings.ServiceBusUri}/input-queue");
x.AddRequestClient<RegisterNewCustomerCommand>
});
}
您的客户端不需要消费者 API。我唯一能想到的是你没有正确设置拓扑。
- 事先确保队列存在,因为函数不会创建队列。
- 配置队列的 URI 地址,以便您的网站 API 可以在
AddRequestClient
调用中指定它作为参数 (_sb://host..../input-queue ). - 盈利!
由于您配置请求客户端的方式,很可能它使用的是发布而不是发送,因为它不知道队列地址。在这种情况下,将消费者添加到您的网络 API 对该主题进行了适当的订阅,以将消息转发到队列。您可以使用 Service Bus Explorer 之类的东西来查看它是如何在云中布局的。
将请求客户端的 URI 添加到客户端应用程序应该可以解决问题,并直接将命令发送到队列。