公共交通 Azure 服务总线和 DI Request/Response 模型

Masstransit Azure Service Bus and DI Request/Response model

我遇到了 Azure 服务总线以及与 Masstransit 和 ASP.NET 核心 DI 集成的问题。 当我尝试为 IBusIRequestClient 设置 DI 并将消息发送到请求端点时,我得到 RequestTimeoutException 但请求端点获取消息并对其进行处理。 当我尝试使用 RabbitMQ 执行相同操作或使用 Azure 服务总线在本地创建 IBus 时,一切正常。 问题仅在我尝试通过 ASP.NET 核心 DI 设置 Azure 服务总线时适用。

设置 azure 服务总线:

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }

    public IConfiguration Configuration { get; }

    // This method gets called by the runtime. Use this method to add services to the container.
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddMvc();

        var bus = InitBus();

        services.AddSingleton<IPublishEndpoint>(bus);
        services.AddSingleton<ISendEndpointProvider>(bus);
        services.AddSingleton<IBus>(bus);

        var timeout = TimeSpan.FromSeconds(10);
        var serviceAddress = new Uri("url");

        services.AddScoped<IRequestClient<SubmitOrder, OrderAccepted>>(x =>
            new MessageRequestClient<SubmitOrder, OrderAccepted>(x.GetRequiredService<IBus>(), serviceAddress, timeout, timeout));

        bus.Start();
    }

    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        if (env.IsDevelopment()) app.UseDeveloperExceptionPage();
        var bus = InitBus();

        bus.Start();

        app.UseMvc();
        //            app.Run();
    }

    public IBusControl InitBus()
    {
        var bus = Bus.Factory.CreateUsingAzureServiceBus(x => x.Host(new Uri(""), h =>
        {
            h.OperationTimeout = TimeSpan.FromSeconds(5);
            h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("KeyName", "KeyValue");
        }));

        return bus;
    }
}

调用 IRequestClient

[Route("api/[controller]")]
public class ValuesController : Controller
{
    private readonly IRequestClient<SubmitOrder, OrderAccepted> _requestClient;

    public ValuesController(IRequestClient<SubmitOrder, OrderAccepted> requestClient)
    {
        _requestClient = requestClient;
    }

    [HttpGet("{id}")]
    public async Task<IActionResult> Get(int id)
    {
        try
        {
            OrderAccepted result = await _requestClient.Request(new { OrderId = id });

            return Accepted(result.OrderId);
        }
        catch (RequestTimeoutException exception)
        {
            return StatusCode((int)HttpStatusCode.RequestTimeout);
        }
    }
}

接收器设置:

 class Program
{
    static void Main(string[] args)
    {
        Start().Wait();
    }

    public static async Task Start()
    {
        var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
            var host = cfg.Host(new Uri("url"), h =>
            {
                h.OperationTimeout = TimeSpan.FromSeconds(5);
                h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider("KeyName", "KeyValue=");
            });

            cfg.ReceiveEndpoint(host, "order-service", e =>
            {
                e.Handler<SubmitOrder>(context => context.RespondAsync<OrderAccepted>(new
                {
                    context.Message.OrderId
                }));
            });
        });

        await bus.StartAsync();

        try
        {
            Console.WriteLine("Working....");

            Console.ReadLine();
        }
        catch (Exception e)
        {
            Console.WriteLine(e);
        }
        finally
        {
            await bus.StopAsync();
        }
    }
}

首先,我建议查看创建的示例(它使用 RabbitMQ,但应该很容易转换为 Azure 服务总线):

https://github.com/phatboyg/Sample-DotNetCore-DI

使用此示例可以正确完成 DI 使用模式。你的例子有几处不太正确。您还在使用未正确捕获 ConsumeContext 以进行响应跟踪的旧版请求客户端。