如何通过地铁获得回应?

How to get response via Masstransit?

我想从消费者 (GetStatusConsumer) 获得响应 (GetStatusResponse)。

我的请求被放入 Rabbit 队列“getstatus”,但我的消费者没有上升并且发生超时。

发布方法和消费者嵌套在一个项目中

看来我的麻烦在Startup.cs。你能帮帮我吗?

我在 Startup.cs

中有以下代码
...
services.AddSingleton(provider =>
            {
                var getStatusBusOptions = provider.GetRequiredService<IOptions<BusOptions>>().Value;
                var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
                {
                    var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =>
                    {
                        h.Username(getStatusBusOptions.UserName);
                        h.Password(getStatusBusOptions.Password);
                    });
                    sbc.ReceiveEndpoint("getstatus", ep =>
                    {
                        ep.Consumer<GetStatusConsumer>(provider);
                        ep.PrefetchCount = getStatusBusOptions.PrefetchCount;
                        ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit);
                    });

                });


            return new GetStatusBus
            { 
                Bus = bus,
                HostUri = getStatusBusOptions.HostUri
            };
        });
...

以下代码在classGetStatusPublisher.cs

public class GetStatusPublisher : IGetStatusPublisher
{
    readonly GetStatusBus _bus;

    public GetStatusPublisher(GetStatusBus bus)
    {
        _bus = bus;
    }

    public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
    {
        var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus"); 
        var timeout = TimeSpan.FromSeconds(30); 

        var client = new MessageRequestClient<Tin, Tout>(_bus.Bus, serviceAddress, timeout);

        var resp = await client.Request(request); // <== Timeout here and don't rise consumer (GetStatusConsumer)

        return resp;
    }

这是发布方法:

...
 readonly IGetStatusPublisher _getStatusPublisher;
...
     var resp = await _getStatusPublisher.GetResponse<GetStatusRequest, GetStatusResponse>(statusReq);

消费者有以下代码:

public class GetStatusConsumer : MetricWriter, IConsumer<GetStatusRequest>
{
        public GetStatusConsumer(IMetrics metrics) : base(metrics)
        {
......
        }

        public async Task Consume(ConsumeContext<GetStatusRequest> context)
        {
....
         }

}

首先,我认为你没有启动公共汽车。这是主要问题。

然而...

您使用的是旧版 MassTransit 吗?你的代码已经严重过时了,我建议更新它以使用当前的 version/syntax 因为你上面的代码示例有很多问题。

services.AddMassTransit(x =>
{
    x.AddConsumer<GetStatusConsumer>();

    x.UsingRabbitMq((context, cfg) => 
    {
        var getStatusBusOptions = context.GetRequiredService<IOptions<BusOptions>>().Value;

        cfg.Host(new Uri(getStatusBusOptions.HostUri), h =>
        {
            h.Username(getStatusBusOptions.UserName);
            h.Password(getStatusBusOptions.Password);
        });

        cfg.PrefetchCount = getStatusBusOptions.PrefetchCount;
        cfg.ConcurrentMessageLimit = getStatusBusOptions.UseConcurrencyLimit;

        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransitHostedService();
services.AddGenericRequestClient();

然后,您只需添加对 IRequestClient<T> 的依赖即可发送请求并获得响应。您更新后的发布商代码可能如下所示:

public class GetStatusPublisher : 
    IGetStatusPublisher
{
    public GetStatusPublisher(IServiceProvider provider)
    {
        _provider = provider;
    }

    public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
    {
        var client = _provider.GetRequiredService<IRequestClient<Tin>>();

        var response = await client.GetResponse<Tout>(request);

        return response.Message;
    }
}

如果我使用以下代码,我会收到错误消息:“System.InvalidOperationException:无法解析来自根提供商的作用域服务 'MassTransit.IRequestClient`1[GetStatusTry.Contracts.GetStatusRequest]'......”。 =12=]

   public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
        {
            try
            {
                var client = _provider.GetRequiredService<IRequestClient<Tin>>(); // << --- here is error 

                var response = await client.GetResponse<Tout>(request);

                return response.Message;
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
                throw;
            }

        }

但是这段代码工作正常:

public class Dev2Controller:ControllerBase {

IRequestClient<GetStatusRequest> _client;

public Dev2Controller(IGetStatusPublisher getStatusPublisher, IRequestClient<GetStatusRequest> client)
{
    _client = client;
}

[HttpPost]
public async Task<GetStatusResponse> GetStatus2()
{
    var req = new GetStatusRequest { Statuses = new List<int> { 1, 2, 3 }, TerminalDescr = "try to get status2" };

    var response = await _client.GetResponse<GetStatusResponse>(req);

    return (response.Message);
}

}

此代码有效

public class GetStatusPublisher : IGetStatusPublisher
{
    readonly IServiceProvider _provider;

    public GetStatusPublisher(IServiceProvider provider)
    {
        _provider = provider;
    }

    public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
    {
        try
        {
            using (var _scope = _provider.CreateScope())
            {
                var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();

                var response = await client.GetResponse<Tout>(request);

                return response.Message;
            }

        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
            throw;
        }

    }
}