使用 Azure Functions 我的公共交通消息从未被成功消费

Using Azure Functions my Mass Transit messages are never successfully consumed

我有一个 Azure 函数项目,它向 Azure 服务总线队列发送一条消息。这行得通。我可以在 Azure 上看到传入的消息。

我有另一个 Azure 函数项目,然后接收并使用该消息。尽管服务总线触发器命中成功,但以下对消费者的调用总是中断。

启动看起来像这样:

namespace MassTransitTest.PushNotificationFunction
{
    public class Startup : FunctionsStartup
    {
        private IConfigurationRoot config;

        public override void Configure(IFunctionsHostBuilder builder)
        {
            builder.Services.AddScoped<SendPushNotification>();
            builder.Services.AddMassTransitForAzureFunctions(cfg =>
            {
                cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
            });
        }
    }
}

我的消费者看起来像这样:

namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
    public class EnergyBudgetExceededConsumer : IConsumer<EnergyBudgetExceeded>
    {
        public async Task Consume(ConsumeContext<EnergyBudgetExceeded> context)
        {
            LogContext.Debug?.Log("Successfully consumed message");
        }
    }
}

使用结构命名空间助手:

namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
    public struct ConsumerNamespace
    {

    }
}

消费者使用以下接口:

namespace MassTransitTest.PushNotificationFunction.MessageModels
{
    public interface EnergyBudgetExceeded
    {
        string TestProperty { get; }
    }
}

接收函数如下所示:

public class SendPushNotification : BaseFunction
{
    private readonly IMessageReceiver _receiver;

    public SendPushNotification(IMessageReceiver receiver)
    {
        _receiver = receiver;
    }

    [FunctionName("EnergyBudgetExceeded")]
    public Task EnergyBudgetExceeded([ServiceBusTrigger("test-queue")]
        ServiceBusReceivedMessage message, CancellationToken cancellationToken)
    {
        return _receiver.HandleConsumer<EnergyBudgetExceededConsumer>("test-queue", message, cancellationToken);
    }
}

上面的函数成功命中,但随后无法在“HandleConsumer”方法上调用消费者并抛出将消息放入 DL 队列的错误。

可以在此处查看失败消息的示例:

{
  "messageId": "4c030000-306b-1065-1a55-08da035849d1",
  "conversationId": "4c030000-306b-1065-3928-08da035849d7",
  "sourceAddress": "sb://fakeurlservicebus.servicebus.windows.net/LAP16844_func_bus_joboyybopcegm7rhbdpygsnnyh?autodelete=300",
  "destinationAddress": "sb://blainetestservicebus.servicebus.windows.net/test-queue",
  "messageType": [
    "urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded+BlaineTestMessage"
  ],
  "message": {
    "testProperty": "Energy budget"
  },
  "sentTime": "2022-03-11T12:11:35.6792405Z",
  "headers": {},
  "host": {
    "machineName": "LAP-16844",
    "processName": "func",
    "processId": 46740,
    "assembly": "func",
    "assemblyVersion": "3.0.3904.0",
    "frameworkVersion": "3.1.20",
    "massTransitVersion": "7.3.1.0",
    "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
  }
}

消费者看起来就像文档建议的那样在启动时被添加,我已经尝试通过名称显式引用它,但这也给出了同样的错误。

我是不是漏掉了什么明显的东西?

我是运行 Dot Net Core 3.1,并使用所有参考MassTransit库的V7.3.1。

出于某种原因,您生成的消息类型与消费者使用的消息类型不匹配:

{
  "messageType": [
    "urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded+BlaineTestMessage"
  ]
}

您生成的消息类型是否实现了该消息contract/interface?