订阅 RabbitMQ 扇出交换后微服务无响应
Microservices Not responding after subscribing to RabbitMQ fanout exchange
我的 .net core3.1 Web 应用程序有 4 个微服务(MasterMS、PartyMS、ProductMS、PurchaseMS)
并使用 Rabbitmq 作为消息代理。
在一个特定的场景中,MasterMS 发布一个事件(insert/update in Company table)到 Rabbitmq exchange (xAlexa),从那里它被扇出到所有订阅的相应队列MS(PartyMS、ProductMS)。
PartyMS 从 CompanyEventPartyMS 队列中获取事件,ProductMS 从 CompanyEventProductMS 队列中获取事件。因此,Party 和 Product 都更新了各自的 Company table,一切都同步且完美。顺便说一句,PurchaseMS 没有订阅,所以不用担心。
现在真正的问题来了。订阅 MS(消费者)在请求其网页时没有响应。 PartyMS 和 ProductMS 网页抛出 SocketException,而非订阅者 PurchaseMS 工作正常。现在,如果我注释掉 PartyMS 订阅的行,它会再次开始工作,尽管它不再获取 CompanyEvent 并且不同步。
有什么见解的朋友吗?
SocketException: 由于目标机器主动拒绝,无法建立连接。
System.Net.Http.ConnectHelper.ConnectAsync(string host, int port, CancellationToken cancellationToken)
public void Publish<T>(T @event) where T : Event
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "xAlexa", type: ExchangeType.Fanout);
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
var eventName = @event.GetType().Name;
channel.BasicPublish(exchange: "xAlexa",
routingKey: eventName, //string.Empty,
basicProperties: null,
body: body);
}
}
开始基本消费
private void StartBasicConsume<T>() where T : Event
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var eventName = typeof(T).Name;
var msName = typeof(T).FullName;
string[] str = { };
str = msName.Split('.');
eventName += str[1];
channel.ExchangeDeclare(exchange: "xAlexa",
type: ExchangeType.Fanout);
channel.QueueDeclare(eventName, true, false, false, null); //channel.QueueDeclare().QueueName;
channel.QueueBind(queue: eventName,
exchange: "xAlexa",
routingKey: string.Empty);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(eventName, true, consumer);
Console.WriteLine("Consumer Started");
Console.ReadLine();
}
private async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var eventName = e.RoutingKey;
var body = e.Body.ToArray();
//var body = e.Body.Span;
var message = Encoding.UTF8.GetString(body);
//var message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine(message);
try
{
await ProcessEvent(eventName, message).ConfigureAwait(false);
}
catch (Exception ex)
{
}
}
从 MVC 应用程序调用 ProductsMS Api(此处是订阅失败的地方,如果未订阅 CompanyEvent 则可以正常工作!)
public class ProductService:IProductService
{
private readonly HttpClient _apiCLient;
public ProductService(HttpClient apiCLient)
{
_apiCLient = apiCLient;
}
public async Task<List<Product>> GetProducts()
{
var uri = "https://localhost:5005/api/ProductApi";
List<Product> userList = new List<Product>();
HttpResponseMessage response = await _apiCLient.GetAsync(uri);
if (response.IsSuccessStatusCode)
{
var readTask = response.Content.ReadAsStringAsync().Result;
userList = JsonConvert.DeserializeObject<List<Product>>(readTask);
}
return userList;
}
}
查找产品MS Api Startup.cs 下面:
namespace Alexa.ProductMS.Api
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
var connectionString = Configuration["DbContextSettings:ConnectionString"];
var dbPassword = Configuration["DbContextSettings:DbPassword"];
var builder = new NpgsqlConnectionStringBuilder(connectionString)
{
Password = dbPassword
};
services.AddDbContext<ProductsDBContext>(opts => opts.UseNpgsql(builder.ConnectionString));
services.AddMediatR(typeof(Startup));
RegisterServices(services);
}
private void RegisterServices(IServiceCollection services)
{
DependencyContainer.RegisterServices(services);
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
});
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
ConfigureEventBus(app); //WORKS IF COMMENTED; FAILS OTHERWISE <---
}
private void ConfigureEventBus(IApplicationBuilder app)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<CompanyEvent, CompanyEventHandler>();
eventBus.Subscribe<PartyEvent, PartyEventHandler>();
}
}
}
另见图片:
RabbitMQ fanout exchange
RabbitMQ Queues
exchange
queues
删除最后一行Console.ReadLine();方法 StartBasicConsume()。
当我们在函数中使用该行时,它正在等待任何按键或任何输入。
我的 .net core3.1 Web 应用程序有 4 个微服务(MasterMS、PartyMS、ProductMS、PurchaseMS) 并使用 Rabbitmq 作为消息代理。
在一个特定的场景中,MasterMS 发布一个事件(insert/update in Company table)到 Rabbitmq exchange (xAlexa),从那里它被扇出到所有订阅的相应队列MS(PartyMS、ProductMS)。
PartyMS 从 CompanyEventPartyMS 队列中获取事件,ProductMS 从 CompanyEventProductMS 队列中获取事件。因此,Party 和 Product 都更新了各自的 Company table,一切都同步且完美。顺便说一句,PurchaseMS 没有订阅,所以不用担心。
现在真正的问题来了。订阅 MS(消费者)在请求其网页时没有响应。 PartyMS 和 ProductMS 网页抛出 SocketException,而非订阅者 PurchaseMS 工作正常。现在,如果我注释掉 PartyMS 订阅的行,它会再次开始工作,尽管它不再获取 CompanyEvent 并且不同步。 有什么见解的朋友吗?
SocketException: 由于目标机器主动拒绝,无法建立连接。 System.Net.Http.ConnectHelper.ConnectAsync(string host, int port, CancellationToken cancellationToken)
public void Publish<T>(T @event) where T : Event
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "xAlexa", type: ExchangeType.Fanout);
var message = JsonConvert.SerializeObject(@event);
var body = Encoding.UTF8.GetBytes(message);
var eventName = @event.GetType().Name;
channel.BasicPublish(exchange: "xAlexa",
routingKey: eventName, //string.Empty,
basicProperties: null,
body: body);
}
}
开始基本消费
private void StartBasicConsume<T>() where T : Event
{
var factory = new ConnectionFactory()
{
HostName = "localhost",
DispatchConsumersAsync = true
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
var eventName = typeof(T).Name;
var msName = typeof(T).FullName;
string[] str = { };
str = msName.Split('.');
eventName += str[1];
channel.ExchangeDeclare(exchange: "xAlexa",
type: ExchangeType.Fanout);
channel.QueueDeclare(eventName, true, false, false, null); //channel.QueueDeclare().QueueName;
channel.QueueBind(queue: eventName,
exchange: "xAlexa",
routingKey: string.Empty);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(eventName, true, consumer);
Console.WriteLine("Consumer Started");
Console.ReadLine();
}
private async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
{
var eventName = e.RoutingKey;
var body = e.Body.ToArray();
//var body = e.Body.Span;
var message = Encoding.UTF8.GetString(body);
//var message = Encoding.UTF8.GetString(e.Body);
Console.WriteLine(message);
try
{
await ProcessEvent(eventName, message).ConfigureAwait(false);
}
catch (Exception ex)
{
}
}
从 MVC 应用程序调用 ProductsMS Api(此处是订阅失败的地方,如果未订阅 CompanyEvent 则可以正常工作!)
public class ProductService:IProductService
{
private readonly HttpClient _apiCLient;
public ProductService(HttpClient apiCLient)
{
_apiCLient = apiCLient;
}
public async Task<List<Product>> GetProducts()
{
var uri = "https://localhost:5005/api/ProductApi";
List<Product> userList = new List<Product>();
HttpResponseMessage response = await _apiCLient.GetAsync(uri);
if (response.IsSuccessStatusCode)
{
var readTask = response.Content.ReadAsStringAsync().Result;
userList = JsonConvert.DeserializeObject<List<Product>>(readTask);
}
return userList;
}
}
查找产品MS Api Startup.cs 下面:
namespace Alexa.ProductMS.Api
{
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
var connectionString = Configuration["DbContextSettings:ConnectionString"];
var dbPassword = Configuration["DbContextSettings:DbPassword"];
var builder = new NpgsqlConnectionStringBuilder(connectionString)
{
Password = dbPassword
};
services.AddDbContext<ProductsDBContext>(opts => opts.UseNpgsql(builder.ConnectionString));
services.AddMediatR(typeof(Startup));
RegisterServices(services);
}
private void RegisterServices(IServiceCollection services)
{
DependencyContainer.RegisterServices(services);
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseHttpsRedirection();
});
app.UseRouting();
app.UseAuthorization();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
ConfigureEventBus(app); //WORKS IF COMMENTED; FAILS OTHERWISE <---
}
private void ConfigureEventBus(IApplicationBuilder app)
{
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<CompanyEvent, CompanyEventHandler>();
eventBus.Subscribe<PartyEvent, PartyEventHandler>();
}
}
}
另见图片:
RabbitMQ fanout exchange
RabbitMQ Queues
exchange
queues
删除最后一行Console.ReadLine();方法 StartBasicConsume()。 当我们在函数中使用该行时,它正在等待任何按键或任何输入。