订阅 RabbitMQ 扇出交换后微服务无响应

Microservices Not responding after subscribing to RabbitMQ fanout exchange

我的 .net core3.1 Web 应用程序有 4 个微服务(MasterMS、PartyMS、ProductMS、PurchaseMS) 并使用 Rabbitmq 作为消息代理。

在一个特定的场景中,MasterMS 发布一个事件(insert/update in Company table)到 Rabbitmq exchange (xAlexa),从那里它被扇出到所有订阅的相应队列MS(PartyMS、ProductMS)。

PartyMS 从 CompanyEventPartyMS 队列中获取事件,ProductMS 从 CompanyEventProductMS 队列中获取事件。因此,Party 和 Product 都更新了各自的 Company table,一切都同步且完美。顺便说一句,PurchaseMS 没有订阅,所以不用担心。

现在真正的问题来了。订阅 MS(消费者)在请求其网页时没有响应。 PartyMS 和 ProductMS 网页抛出 SocketException,而非订阅者 PurchaseMS 工作正常。现在,如果我注释掉 PartyMS 订阅的行,它会再次开始工作,尽管它不再获取 CompanyEvent 并且不同步。 有什么见解的朋友吗?

SocketException: 由于目标机器主动拒绝,无法建立连接。 System.Net.Http.ConnectHelper.ConnectAsync(string host, int port, CancellationToken cancellationToken)

public void Publish<T>(T @event) where T : Event
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "xAlexa", type: ExchangeType.Fanout);

                var message = JsonConvert.SerializeObject(@event);
                var body = Encoding.UTF8.GetBytes(message);
                var eventName = @event.GetType().Name;

                channel.BasicPublish(exchange: "xAlexa",
                                 routingKey: eventName, //string.Empty,
                                 basicProperties: null,
                                 body: body);
            }
        }

开始基本消费

private void StartBasicConsume<T>() where T : Event
        {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                DispatchConsumersAsync = true
            };

            var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            var eventName = typeof(T).Name;
            var msName = typeof(T).FullName;
            string[] str = { };
            str = msName.Split('.');
            eventName += str[1];
            
            channel.ExchangeDeclare(exchange: "xAlexa",
                                        type: ExchangeType.Fanout);
                        
            channel.QueueDeclare(eventName, true, false, false, null);  //channel.QueueDeclare().QueueName;

            channel.QueueBind(queue: eventName,
                              exchange: "xAlexa",
                              routingKey: string.Empty);

            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += Consumer_Received;

            channel.BasicConsume(eventName, true, consumer);
            Console.WriteLine("Consumer Started");
            Console.ReadLine();
        }

     private async Task Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            var eventName = e.RoutingKey;
            var body = e.Body.ToArray();
            //var body = e.Body.Span;
            var message = Encoding.UTF8.GetString(body);
            //var message = Encoding.UTF8.GetString(e.Body);
            Console.WriteLine(message);

            try
            {
                await ProcessEvent(eventName, message).ConfigureAwait(false);
            }
            catch (Exception ex)
            {
            }
        }

从 MVC 应用程序调用 ProductsMS Api(此处是订阅失败的地方,如果未订阅 CompanyEvent 则可以正常工作!)

public class ProductService:IProductService
    {
        private readonly HttpClient _apiCLient;

        public ProductService(HttpClient apiCLient)
        {
            _apiCLient = apiCLient;
        }

       public async Task<List<Product>> GetProducts()
        {
            var uri = "https://localhost:5005/api/ProductApi";
            List<Product> userList = new List<Product>();
            HttpResponseMessage response = await _apiCLient.GetAsync(uri);
            if (response.IsSuccessStatusCode)
            {
                var readTask = response.Content.ReadAsStringAsync().Result;
                userList = JsonConvert.DeserializeObject<List<Product>>(readTask);
            }
            return userList;
        }
}

查找产品MS Api Startup.cs 下面:

namespace Alexa.ProductMS.Api
{
    public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();
            var connectionString = Configuration["DbContextSettings:ConnectionString"];
            var dbPassword = Configuration["DbContextSettings:DbPassword"];
            var builder = new NpgsqlConnectionStringBuilder(connectionString)
            {
                Password = dbPassword
            };
            services.AddDbContext<ProductsDBContext>(opts => opts.UseNpgsql(builder.ConnectionString));

            services.AddMediatR(typeof(Startup));

            RegisterServices(services);
        }

        private void RegisterServices(IServiceCollection services)
        {
            DependencyContainer.RegisterServices(services);
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            });
            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            ConfigureEventBus(app); //WORKS IF COMMENTED; FAILS OTHERWISE <---
        }

        private void ConfigureEventBus(IApplicationBuilder app)
        {
            var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();            
            eventBus.Subscribe<CompanyEvent, CompanyEventHandler>();
            eventBus.Subscribe<PartyEvent, PartyEventHandler>();
        }

    }
}

另见图片:

RabbitMQ fanout exchange

RabbitMQ Queues

exchange

queues

删除最后一行Console.ReadLine();方法 StartBasicConsume()。 当我们在函数中使用该行时,它正在等待任何按键或任何输入。