使用 Azure Functions 我的公共交通消息从未被成功消费
Using Azure Functions my Mass Transit messages are never successfully consumed
我有一个 Azure 函数项目,它向 Azure 服务总线队列发送一条消息。这行得通。我可以在 Azure 上看到传入的消息。
我有另一个 Azure 函数项目,然后接收并使用该消息。尽管服务总线触发器命中成功,但以下对消费者的调用总是中断。
启动看起来像这样:
namespace MassTransitTest.PushNotificationFunction
{
public class Startup : FunctionsStartup
{
private IConfigurationRoot config;
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddScoped<SendPushNotification>();
builder.Services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
});
}
}
}
我的消费者看起来像这样:
namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
public class EnergyBudgetExceededConsumer : IConsumer<EnergyBudgetExceeded>
{
public async Task Consume(ConsumeContext<EnergyBudgetExceeded> context)
{
LogContext.Debug?.Log("Successfully consumed message");
}
}
}
使用结构命名空间助手:
namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
public struct ConsumerNamespace
{
}
}
消费者使用以下接口:
namespace MassTransitTest.PushNotificationFunction.MessageModels
{
public interface EnergyBudgetExceeded
{
string TestProperty { get; }
}
}
接收函数如下所示:
public class SendPushNotification : BaseFunction
{
private readonly IMessageReceiver _receiver;
public SendPushNotification(IMessageReceiver receiver)
{
_receiver = receiver;
}
[FunctionName("EnergyBudgetExceeded")]
public Task EnergyBudgetExceeded([ServiceBusTrigger("test-queue")]
ServiceBusReceivedMessage message, CancellationToken cancellationToken)
{
return _receiver.HandleConsumer<EnergyBudgetExceededConsumer>("test-queue", message, cancellationToken);
}
}
上面的函数成功命中,但随后无法在“HandleConsumer”方法上调用消费者并抛出将消息放入 DL 队列的错误。
可以在此处查看失败消息的示例:
{
"messageId": "4c030000-306b-1065-1a55-08da035849d1",
"conversationId": "4c030000-306b-1065-3928-08da035849d7",
"sourceAddress": "sb://fakeurlservicebus.servicebus.windows.net/LAP16844_func_bus_joboyybopcegm7rhbdpygsnnyh?autodelete=300",
"destinationAddress": "sb://blainetestservicebus.servicebus.windows.net/test-queue",
"messageType": [
"urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded+BlaineTestMessage"
],
"message": {
"testProperty": "Energy budget"
},
"sentTime": "2022-03-11T12:11:35.6792405Z",
"headers": {},
"host": {
"machineName": "LAP-16844",
"processName": "func",
"processId": 46740,
"assembly": "func",
"assemblyVersion": "3.0.3904.0",
"frameworkVersion": "3.1.20",
"massTransitVersion": "7.3.1.0",
"operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
}
}
消费者看起来就像文档建议的那样在启动时被添加,我已经尝试通过名称显式引用它,但这也给出了同样的错误。
我是不是漏掉了什么明显的东西?
我是运行 Dot Net Core 3.1,并使用所有参考MassTransit库的V7.3.1。
出于某种原因,您生成的消息类型与消费者使用的消息类型不匹配:
{
"messageType": [
"urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded+BlaineTestMessage"
]
}
您生成的消息类型是否实现了该消息contract/interface?
我有一个 Azure 函数项目,它向 Azure 服务总线队列发送一条消息。这行得通。我可以在 Azure 上看到传入的消息。
我有另一个 Azure 函数项目,然后接收并使用该消息。尽管服务总线触发器命中成功,但以下对消费者的调用总是中断。
启动看起来像这样:
namespace MassTransitTest.PushNotificationFunction
{
public class Startup : FunctionsStartup
{
private IConfigurationRoot config;
public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddScoped<SendPushNotification>();
builder.Services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
});
}
}
}
我的消费者看起来像这样:
namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
public class EnergyBudgetExceededConsumer : IConsumer<EnergyBudgetExceeded>
{
public async Task Consume(ConsumeContext<EnergyBudgetExceeded> context)
{
LogContext.Debug?.Log("Successfully consumed message");
}
}
}
使用结构命名空间助手:
namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
public struct ConsumerNamespace
{
}
}
消费者使用以下接口:
namespace MassTransitTest.PushNotificationFunction.MessageModels
{
public interface EnergyBudgetExceeded
{
string TestProperty { get; }
}
}
接收函数如下所示:
public class SendPushNotification : BaseFunction
{
private readonly IMessageReceiver _receiver;
public SendPushNotification(IMessageReceiver receiver)
{
_receiver = receiver;
}
[FunctionName("EnergyBudgetExceeded")]
public Task EnergyBudgetExceeded([ServiceBusTrigger("test-queue")]
ServiceBusReceivedMessage message, CancellationToken cancellationToken)
{
return _receiver.HandleConsumer<EnergyBudgetExceededConsumer>("test-queue", message, cancellationToken);
}
}
上面的函数成功命中,但随后无法在“HandleConsumer”方法上调用消费者并抛出将消息放入 DL 队列的错误。
可以在此处查看失败消息的示例:
{
"messageId": "4c030000-306b-1065-1a55-08da035849d1",
"conversationId": "4c030000-306b-1065-3928-08da035849d7",
"sourceAddress": "sb://fakeurlservicebus.servicebus.windows.net/LAP16844_func_bus_joboyybopcegm7rhbdpygsnnyh?autodelete=300",
"destinationAddress": "sb://blainetestservicebus.servicebus.windows.net/test-queue",
"messageType": [
"urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded+BlaineTestMessage"
],
"message": {
"testProperty": "Energy budget"
},
"sentTime": "2022-03-11T12:11:35.6792405Z",
"headers": {},
"host": {
"machineName": "LAP-16844",
"processName": "func",
"processId": 46740,
"assembly": "func",
"assemblyVersion": "3.0.3904.0",
"frameworkVersion": "3.1.20",
"massTransitVersion": "7.3.1.0",
"operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
}
}
消费者看起来就像文档建议的那样在启动时被添加,我已经尝试通过名称显式引用它,但这也给出了同样的错误。
我是不是漏掉了什么明显的东西?
我是运行 Dot Net Core 3.1,并使用所有参考MassTransit库的V7.3.1。
出于某种原因,您生成的消息类型与消费者使用的消息类型不匹配:
{
"messageType": [
"urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded+BlaineTestMessage"
]
}
您生成的消息类型是否实现了该消息contract/interface?