如何通过地铁获得回应?
How to get response via Masstransit?
我想从消费者 (GetStatusConsumer) 获得响应 (GetStatusResponse)。
我的请求被放入 Rabbit 队列“getstatus”,但我的消费者没有上升并且发生超时。
发布方法和消费者嵌套在一个项目中
看来我的麻烦在Startup.cs。你能帮帮我吗?
我在 Startup.cs
中有以下代码
...
services.AddSingleton(provider =>
{
var getStatusBusOptions = provider.GetRequiredService<IOptions<BusOptions>>().Value;
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =>
{
h.Username(getStatusBusOptions.UserName);
h.Password(getStatusBusOptions.Password);
});
sbc.ReceiveEndpoint("getstatus", ep =>
{
ep.Consumer<GetStatusConsumer>(provider);
ep.PrefetchCount = getStatusBusOptions.PrefetchCount;
ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit);
});
});
return new GetStatusBus
{
Bus = bus,
HostUri = getStatusBusOptions.HostUri
};
});
...
以下代码在classGetStatusPublisher.cs
public class GetStatusPublisher : IGetStatusPublisher
{
readonly GetStatusBus _bus;
public GetStatusPublisher(GetStatusBus bus)
{
_bus = bus;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus");
var timeout = TimeSpan.FromSeconds(30);
var client = new MessageRequestClient<Tin, Tout>(_bus.Bus, serviceAddress, timeout);
var resp = await client.Request(request); // <== Timeout here and don't rise consumer (GetStatusConsumer)
return resp;
}
这是发布方法:
...
readonly IGetStatusPublisher _getStatusPublisher;
...
var resp = await _getStatusPublisher.GetResponse<GetStatusRequest, GetStatusResponse>(statusReq);
消费者有以下代码:
public class GetStatusConsumer : MetricWriter, IConsumer<GetStatusRequest>
{
public GetStatusConsumer(IMetrics metrics) : base(metrics)
{
......
}
public async Task Consume(ConsumeContext<GetStatusRequest> context)
{
....
}
}
首先,我认为你没有启动公共汽车。这是主要问题。
然而...
您使用的是旧版 MassTransit 吗?你的代码已经严重过时了,我建议更新它以使用当前的 version/syntax 因为你上面的代码示例有很多问题。
services.AddMassTransit(x =>
{
x.AddConsumer<GetStatusConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
var getStatusBusOptions = context.GetRequiredService<IOptions<BusOptions>>().Value;
cfg.Host(new Uri(getStatusBusOptions.HostUri), h =>
{
h.Username(getStatusBusOptions.UserName);
h.Password(getStatusBusOptions.Password);
});
cfg.PrefetchCount = getStatusBusOptions.PrefetchCount;
cfg.ConcurrentMessageLimit = getStatusBusOptions.UseConcurrencyLimit;
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddGenericRequestClient();
然后,您只需添加对 IRequestClient<T>
的依赖即可发送请求并获得响应。您更新后的发布商代码可能如下所示:
public class GetStatusPublisher :
IGetStatusPublisher
{
public GetStatusPublisher(IServiceProvider provider)
{
_provider = provider;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
var client = _provider.GetRequiredService<IRequestClient<Tin>>();
var response = await client.GetResponse<Tout>(request);
return response.Message;
}
}
如果我使用以下代码,我会收到错误消息:“System.InvalidOperationException:无法解析来自根提供商的作用域服务 'MassTransit.IRequestClient`1[GetStatusTry.Contracts.GetStatusRequest]'......”。 =12=]
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
try
{
var client = _provider.GetRequiredService<IRequestClient<Tin>>(); // << --- here is error
var response = await client.GetResponse<Tout>(request);
return response.Message;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
但是这段代码工作正常:
public class Dev2Controller:ControllerBase
{
IRequestClient<GetStatusRequest> _client;
public Dev2Controller(IGetStatusPublisher getStatusPublisher, IRequestClient<GetStatusRequest> client)
{
_client = client;
}
[HttpPost]
public async Task<GetStatusResponse> GetStatus2()
{
var req = new GetStatusRequest { Statuses = new List<int> { 1, 2, 3 }, TerminalDescr = "try to get status2" };
var response = await _client.GetResponse<GetStatusResponse>(req);
return (response.Message);
}
}
此代码有效
public class GetStatusPublisher : IGetStatusPublisher
{
readonly IServiceProvider _provider;
public GetStatusPublisher(IServiceProvider provider)
{
_provider = provider;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
try
{
using (var _scope = _provider.CreateScope())
{
var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
var response = await client.GetResponse<Tout>(request);
return response.Message;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
}
我想从消费者 (GetStatusConsumer) 获得响应 (GetStatusResponse)。
我的请求被放入 Rabbit 队列“getstatus”,但我的消费者没有上升并且发生超时。
发布方法和消费者嵌套在一个项目中
看来我的麻烦在Startup.cs。你能帮帮我吗?
我在 Startup.cs
中有以下代码...
services.AddSingleton(provider =>
{
var getStatusBusOptions = provider.GetRequiredService<IOptions<BusOptions>>().Value;
var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
var host = sbc.Host(new Uri(getStatusBusOptions.HostUri), h =>
{
h.Username(getStatusBusOptions.UserName);
h.Password(getStatusBusOptions.Password);
});
sbc.ReceiveEndpoint("getstatus", ep =>
{
ep.Consumer<GetStatusConsumer>(provider);
ep.PrefetchCount = getStatusBusOptions.PrefetchCount;
ep.UseConcurrencyLimit(getStatusBusOptions.UseConcurrencyLimit);
});
});
return new GetStatusBus
{
Bus = bus,
HostUri = getStatusBusOptions.HostUri
};
});
...
以下代码在classGetStatusPublisher.cs
public class GetStatusPublisher : IGetStatusPublisher
{
readonly GetStatusBus _bus;
public GetStatusPublisher(GetStatusBus bus)
{
_bus = bus;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
var serviceAddress = new Uri($"rabbitmq://rabbitmq.test.com/jgt/getstatus");
var timeout = TimeSpan.FromSeconds(30);
var client = new MessageRequestClient<Tin, Tout>(_bus.Bus, serviceAddress, timeout);
var resp = await client.Request(request); // <== Timeout here and don't rise consumer (GetStatusConsumer)
return resp;
}
这是发布方法:
...
readonly IGetStatusPublisher _getStatusPublisher;
...
var resp = await _getStatusPublisher.GetResponse<GetStatusRequest, GetStatusResponse>(statusReq);
消费者有以下代码:
public class GetStatusConsumer : MetricWriter, IConsumer<GetStatusRequest>
{
public GetStatusConsumer(IMetrics metrics) : base(metrics)
{
......
}
public async Task Consume(ConsumeContext<GetStatusRequest> context)
{
....
}
}
首先,我认为你没有启动公共汽车。这是主要问题。
然而...
您使用的是旧版 MassTransit 吗?你的代码已经严重过时了,我建议更新它以使用当前的 version/syntax 因为你上面的代码示例有很多问题。
services.AddMassTransit(x =>
{
x.AddConsumer<GetStatusConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
var getStatusBusOptions = context.GetRequiredService<IOptions<BusOptions>>().Value;
cfg.Host(new Uri(getStatusBusOptions.HostUri), h =>
{
h.Username(getStatusBusOptions.UserName);
h.Password(getStatusBusOptions.Password);
});
cfg.PrefetchCount = getStatusBusOptions.PrefetchCount;
cfg.ConcurrentMessageLimit = getStatusBusOptions.UseConcurrencyLimit;
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
services.AddGenericRequestClient();
然后,您只需添加对 IRequestClient<T>
的依赖即可发送请求并获得响应。您更新后的发布商代码可能如下所示:
public class GetStatusPublisher :
IGetStatusPublisher
{
public GetStatusPublisher(IServiceProvider provider)
{
_provider = provider;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
var client = _provider.GetRequiredService<IRequestClient<Tin>>();
var response = await client.GetResponse<Tout>(request);
return response.Message;
}
}
如果我使用以下代码,我会收到错误消息:“System.InvalidOperationException:无法解析来自根提供商的作用域服务 'MassTransit.IRequestClient`1[GetStatusTry.Contracts.GetStatusRequest]'......”。 =12=]
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
try
{
var client = _provider.GetRequiredService<IRequestClient<Tin>>(); // << --- here is error
var response = await client.GetResponse<Tout>(request);
return response.Message;
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
但是这段代码工作正常:
public class Dev2Controller:ControllerBase {
IRequestClient<GetStatusRequest> _client;
public Dev2Controller(IGetStatusPublisher getStatusPublisher, IRequestClient<GetStatusRequest> client)
{
_client = client;
}
[HttpPost]
public async Task<GetStatusResponse> GetStatus2()
{
var req = new GetStatusRequest { Statuses = new List<int> { 1, 2, 3 }, TerminalDescr = "try to get status2" };
var response = await _client.GetResponse<GetStatusResponse>(req);
return (response.Message);
}
}
此代码有效
public class GetStatusPublisher : IGetStatusPublisher
{
readonly IServiceProvider _provider;
public GetStatusPublisher(IServiceProvider provider)
{
_provider = provider;
}
public async Task<Tout> GetResponse<Tin, Tout>(Tin request) where Tin : class where Tout : class
{
try
{
using (var _scope = _provider.CreateScope())
{
var client = _scope.ServiceProvider.GetRequiredService<IRequestClient<Tin>>();
var response = await client.GetResponse<Tout>(request);
return response.Message;
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
}