ASP.NET 核心 WebApplicationFactory CreateClient 引导服务器在 GitLab CI 上阻塞 运行 在 Docker Compose 环境中

ASP.NET Core WebApplicationFactory CreateClient Booting Server Blocking on GitLab CI When Run In Docker Compose Environment

我的 ASP.NET Core 3.1 API 有一个功能测试项目,它使用 WebApplicationFactory 为 API 创建一个测试服务器。测试在本地和本地 docker-compose 环境中通过。

然而,当 运行 在同一 docker-compose 环境中的 GitLab CI 服务器上时,WebApplication 工厂的 CreateClient 方法被阻塞。当测试服务器启动时,汇合的 Kafka 管理服务被阻塞,即没有确认日志消息显示确认创建的主题。我在 GitLab 上创建了一个小 project 来强调这个问题。

这个问题似乎与 WebApplication 测试服务器和 Confluent Kafka 不知何故有关,因为我已经创建了一个 docker-compose 堆栈,它在 GitLab CI 上启动了被测软件 WebApp 并且它启动了成功。

被测软件包含后台/托管服务:

它还使用 Autofac 并启动 SignalR Hub。

在 Gitlab 或 Travis 等远程 CI 服务器上使用 WebApplicationFactory 时,有没有人经历过类似的 issue/problems?

是否因为 WebApplicationFactory.CreateClient() 创建了一个 TestServer 运行 作为本地主机??

使用 WebApplicationFactory 进行测试

为被测软件创建 WebApplicationFactory 并在创建后显示控制台日志消息。当 运行 在 CI 服务器上时,在创建工厂客户端后没有控制台消息显示。

  [Fact]
        public void WebApp_ApiController_DownloadImage()
        {
            Console.WriteLine("TEST WebApp_ApiController_DownloadImage");

            var appFactory = new WebApplicationFactory<WebApp.Startup>()
               .WithWebHostBuilder(builder =>
                  {
                  });

            /** THIS CODE HANGS WHILE Bootsrapping the Services in Startup **/
            /** NO TEST MESSAGE IS DISPLAYED **/
            using (var client = appFactory.CreateClient())
            {
                Console.WriteLine("WE ARE IN THE TEST HERE");
            }
        }

Startup.cs

包含 Kafka 和 Mqtt 的后台服务

public class Startup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public ILifetimeScope AutofacContainer { get; private set; }

        // This method gets called by the runtime. Use this method to add services to the container.
        // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
        public virtual void ConfigureServices(IServiceCollection services)
        {
            services.AddRazorPages();
            services.AddServerSideBlazor();

            services
                .AddCustomConfiguration(Configuration)
                .AddBackgroundServices()
                .AddLogging()
                .AddCustomSignalR();
        }


        // ConfigureContainer is where you can register things directly
        // with Autofac. This runs after ConfigureServices so the things
        // here will override registrations made in ConfigureServices.
        // Don't build the container; that gets done for you by the factory.
        public virtual void ConfigureContainer(ContainerBuilder builder)
        {
            // Register your own things directly with Autofac here. Don't
            // call builder.Populate(), that happens in AutofacServiceProviderFactory
            // for you.
            builder.RegisterModule(new MqttModule());
            builder.RegisterModule(new MotionDetectionRepositoryModule());
            builder.RegisterModule(new KafkaModule());
            builder.RegisterAssemblyTypes(typeof(MotionDetection).GetTypeInfo().Assembly);
        }


        // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
        public virtual void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            else
            {
                app.UseExceptionHandler("/Error");
                // The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts.
                app.UseHsts();
            }

            // app.UseHttpsRedirection();
            app.UseStaticFiles();

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
                endpoints.MapHub<MotionHub>("/motionhub");
                endpoints.MapBlazorHub();
                endpoints.MapFallbackToPage("/_Host");
            });
        }
    }

    static class CustomExtensionsMethods
    {
        /// <summary>
        /// Create background services to:
        /// 1: Create Kafka topic from config if not already created
        /// 2: Mqtt->Kafka Bridge for object detections
        /// 3: Consume Kafka object detections and forward to signalR
        /// </summary>
        /// <param name="services">Service collection</param>
        public static IServiceCollection AddBackgroundServices(this IServiceCollection services)
        {
            services.AddHostedService<KafkaAdminService>();
            services.AddHostedService<MqttKafkaBridge>();
            services.AddHostedService<ConsumerService>();

            return services;
        }

        public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration)
        {
            if (!configuration.GetSection(S3Config.SectionName).Exists())
            {
                throw new InvalidOperationException($"Failed to locate section {S3Config.SectionName} in config file");
            }
            services.Configure<S3Config>(options => configuration.GetSection(S3Config.SectionName).Bind(options));

            if (!configuration.GetSection(MqttConfig.SectionName).Exists())
            {
                throw new InvalidOperationException($"Failed to locate section {MqttConfig.SectionName} in config file");
            }
            services.Configure<MqttConfig>(options => configuration.GetSection(MqttConfig.SectionName).Bind(options));

            if (!configuration.GetSection(KafkaConfig.SectionName).Exists())
            {
                throw new InvalidOperationException($"Failed to locate section {KafkaConfig.SectionName} in config file");
            }
            services.Configure<KafkaConfig>(options => configuration.GetSection(KafkaConfig.SectionName).Bind(options));

            return services;
        }

        public static IServiceCollection AddCustomSignalR(this IServiceCollection services)
        {
            var sp = services.BuildServiceProvider();

            var loggerMD = sp.GetService<ILogger<MotionDetectionConverter>>();
            var loggerMI = sp.GetService<ILogger<MotionInfoConverter>>();
            var loggerJV = sp.GetService<ILogger<JsonVisitor>>();

            services.AddSignalR(o => o.EnableDetailedErrors = true)
               .AddJsonProtocol(options =>
               {
                   options.PayloadSerializerOptions = JsonConvertersFactory.CreateDefaultJsonConverters(loggerMD, loggerMI, loggerJV);
               });

            return services;
        }
    }

向 Kafka 发送创建主题请求的后台服务 - 使用 WebApplicationFactory 测试服务器时阻塞

    public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
    public class KafkaAdminService : IHostedService
    {
        private KafkaAdminFactory _Factory { get; set; }
        private ILogger<KafkaAdminService> _Logger { get; set; }
        private KafkaConfig _Config { get; set; }


        /// <summary>
        /// Retrieve KafkaConfig from appsettings
        /// </summary>
        /// <param name="config">Config POCO from appsettings file</param>
        /// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
        /// <param name="logger">Logger instance</param>
        public KafkaAdminService(
            IOptions<KafkaConfig> config,
            KafkaAdminFactory clientFactory,
            ILogger<KafkaAdminService> logger)
        {
            if (clientFactory == null)
                throw new ArgumentNullException(nameof(clientFactory));

            if (config == null)
                throw new ArgumentNullException(nameof(config));

            _Config = config.Value ?? throw new ArgumentNullException(nameof(config));
            _Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
            _Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }


        /// <summary>
        /// Create a Kafka topic if it does not already exist
        /// </summary>
        /// <param name="token">Cancellation token required by IHostedService</param>
        /// <exception name="CreateTopicsException">
        /// Thrown for exceptions encountered except duplicate topic
        /// </exception>
        public async Task StartAsync(CancellationToken token)
        {
            using (var client = _Factory(_Config))
            {
                await CreateTopicAsync(client);
            }
        }

        /// <summary>Dispatch request to Kafka Broker to create Kafka topic from config</summary>
        /// <param name="client">Kafka admin client</param>
        /// <exception cref="">Thrown for errors except topic already exists</exception>
        private async Task CreateTopicAsync(IAdminClient client)
        {
            try
            {
                _Logger.LogInformation("Admin service trying to create Kafka Topic...");
                _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
                _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");

                await client.CreateTopicsAsync(new TopicSpecification[] {
                        new TopicSpecification {
                            Name = _Config.Topic.Name,
                            NumPartitions = _Config.Topic.PartitionCount,
                            ReplicationFactor = _Config.Topic.ReplicationCount
                        }
                    }, null);

                _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
            }
            catch (CreateTopicsException e)
            {
                if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                {
                    _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
                    throw e;
                }
                else
                {
                    _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
                }
            }
        }

        /// <summary>No-op</summary>
        /// <param name="token">Cancellation token</param>
        public async Task StopAsync(CancellationToken token) => await Task.CompletedTask;
    }

GitLab 管道运行宁同docker-远程组合堆栈

为管道变量创建 .env 文件并启动 docker-compose stack

stages:
  - build
  - test
  - release

variables:
  DOCKER_DRIVER: overlay2

services:
  - docker:19.03.11-dind

test:
  image: docker/compose:debian-1.27.4
  stage: test
  variables:
    DOCKER_BUILDKIT: 1
    COMPOSE_DOCKER_CLI_BUILD: 1
  before_script:
    - docker login -u $CI_REGISTRY_USER -p $CI_JOB_TOKEN $CI_REGISTRY
  script:
    - cd Docker
    - echo "MQTT_USER=${MQTT_USER}" >> .env
    - echo "MQTT_PASSWORD=${MQTT_PASSWORD}" >> .env
    - echo "MINIO_USER=${MINIO_USER}" >> .env
    - echo "MINIO_PASSWORD=${MINIO_PASSWORD}" >> .env
    - docker-compose -f docker-compose-ci.yml build webapp
    - docker-compose -f docker-compose-ci.yml up --exit-code-from webapp --abort-on-container-exit

GitLab CI 作业输出

已通过 CI

上的调试读取并显示正确的测试环境变量
netclient-run     | .NET Run Web App Ready. Starting WebApp that contains KafkaAdmin background service.
netclient-test    | Giving netclient-run a bit of time to start up…
netclient-run     | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
netclient-run     |       Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
netclient-run     | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
netclient-run     |       No XML encryptor configured. Key {395ba0f4-cde9-49af-8fb4-fd16b9f05bae} may be persisted to storage in unencrypted form.
netclient-run     | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run     |       Admin service trying to create Kafka Topic...
netclient-run     | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run     |       Topic::eventbus, ReplicationCount::1, PartitionCount::3
netclient-run     | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run     |       Bootstrap Servers::kafka:9092
netclient-run     | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run     |       Admin service successfully created topic eventbus
netclient-run     | info: Microsoft.Hosting.Lifetime[0]
netclient-run     |       Now listening on: http://[::]:80
netclient-run     | info: Microsoft.Hosting.Lifetime[0]
netclient-run     |       Application started. Press Ctrl+C to shut down.
netclient-run     | info: Microsoft.Hosting.Lifetime[0]
netclient-run     |       Hosting environment: Docker
netclient-run     | info: Microsoft.Hosting.Lifetime[0]
netclient-run     |       Content root path: /KafkaAdmin/src/KafkaAdmin.WebApp


netclient-test    | .NET Client test container ready. Running test that uses WebApplicationFactory TestServer to start WebApp with KafkaAdmin background service
netclient-test    | This runs successfully in a local development environment on MacOS and Ubuntu Linux 16.04.
netclient-test    | This fails when running on a GitLab CI Server. It can be seen that the test server bootstraps the WebApp.....
netclient-test    | The KafkaAdmin background service blocks when requesting topic creation from the kafka service
netclient-test    | Test run for /KafkaAdmin/tests/KafkaAdmin.Kafka.IntegrationTests/bin/Release/netcoreapp3.1/linux-musl-x64/KafkaAdmin.Kafka.IntegrationTests.dll(.NETCoreApp,Version=v3.1)
netclient-test    | Starting test execution, please wait...
netclient-test    | 
netclient-test    | A total of 1 test files matched the specified pattern.
netclient-test    | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
netclient-test    |       Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
netclient-test    | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
netclient-test    |       No XML encryptor configured. Key {2b234f03-01b4-472d-9621-db8e056db173} may be persisted to storage in unencrypted form.
netclient-test    | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test    |       Admin service trying to create Kafka Topic...
netclient-test    | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test    |       Topic::eventbus, ReplicationCount::1, PartitionCount::3
netclient-test    | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test    |       Bootstrap Servers::kafka:9092

docker-撰写

在同一网络中包含 Kafka、Zookeeper 和 WebApp(源+测试)服务。如果将 WebApp 服务器的命令更改为 运行 被测软件 WebApp,则 运行 在 CI 上成功。只有在远程 GitLan CI 服务器上使用 WebApplicationFactory 测试服务器时才会遇到此问题。

---
version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - camnet
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN

  kafka:
    image: confluentinc/cp-kafka:6.0.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    networks:
      - camnet
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_HEAP_OPTS: -Xmx512M -Xms512M
      KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"

  mqtt:
    image: eclipse-mosquitto:1.6.9
    hostname: mqtt
    container_name: mqtt
    ports:
      - "8883:8883"
      - "1883:1883"
      - "9901:9001"
    networks:
      - camnet
    environment:
      - MOSQUITTO_USERNAME=${MQTT_USER}
      - MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
    volumes:
      - ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
      - ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
      - ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
      - ./Mqtt/Certs/server.key:/mosquitto/config/server.key

  minio:
    image: dcs3spp/minio:version-1.0.2
    hostname: minio
    container_name: minio
    ports:
      - "9000:9000"
    networks:
      - camnet
    environment:
      - MINIO_BUCKET=images
      - MINIO_ACCESS_KEY=${MINIO_USER}
      - MINIO_SECRET_KEY=${MINIO_PASSWORD}

  webapp:
    build:
      context: ../
      dockerfile: Docker/Test/Dockerfile.debian
      target: test
    hostname: webapp
    container_name: webapp
    image: dcs3spp/webapp
    depends_on:
      - kafka
      - minio
      - mqtt
    networks:
      - camnet
    entrypoint: []
    command: >
      /bin/sh -c "
        echo Waiting for kafka service start...;
        while ! nc -z kafka 9092;
        do
          sleep 1;
        done;
        echo Connected!;
        dotnet test ./Tests/FunctionalTests/WebApp.FunctionalTests;
      "
    environment:
      - ASPNETCORE_ENVIRONMENT=Docker
      - ASPNETCORE_URLS=http://+:80
      - MqttSettings__UserName=${MQTT_USER}
      - MqttSettings__Password=${MQTT_PASSWORD}
      - S3Settings__AccessKey=${MINIO_USER}
      - S3Settings__SecretKey=${MINIO_PASSWORD}
    volumes:
      - ../CoverageReports:/CoverageReports

networks:
  camnet:

看完这篇 aspnetcore issue 发现问题出在实现上 我的 IHostedService 实现。

StartAsync 方法正在执行任务,运行 直到请求完成。按照设计,此方法意味着即发即弃,即开始任务然后继续。将我的 KafkaAdmin 服务更新为 BackgroundService,覆盖 ExecuteAsync 方法,如下所列。 随后,测试不再阻塞。

using System;
using System.Threading;
using System.Threading.Tasks;

using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using KafkaAdmin.Kafka.Config;


namespace KafkaAdmin.Kafka
{
    public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);

    /// <summary>Background Service to make a request from Kafka to create a topic</summary>
    public class KafkaAdminService : BackgroundService, IDisposable
    {
        private KafkaAdminFactory _Factory { get; set; }
        private ILogger<KafkaAdminService> _Logger { get; set; }
        private KafkaConfig _Config { get; set; }


        /// <summary>
        /// Retrieve KafkaConfig from appsettings
        /// </summary>
        /// <param name="config">Config POCO from appsettings file</param>
        /// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
        /// <param name="logger">Logger instance</param>
        public KafkaAdminService(
            IOptions<KafkaConfig> config,
            KafkaAdminFactory clientFactory,
            ILogger<KafkaAdminService> logger)
        {
            if (clientFactory == null)
                throw new ArgumentNullException(nameof(clientFactory));

            if (config == null)
                throw new ArgumentNullException(nameof(config));

            _Config = config.Value ?? throw new ArgumentNullException(nameof(config));
            _Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
            _Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }


        /// <summary>
        /// Create a Kafka topic if it does not already exist
        /// </summary>
        /// <param name="token">Cancellation token required by IHostedService</param>
        /// <exception name="CreateTopicsException">
        /// Thrown for exceptions encountered except duplicate topic
        /// </exception>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using (var client = _Factory(_Config))
            {
                try
                {
                    _Logger.LogInformation("Admin service trying to create Kafka Topic...");
                    _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
                    _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");

                    await client.CreateTopicsAsync(new TopicSpecification[] {
                        new TopicSpecification {
                            Name = _Config.Topic.Name,
                            NumPartitions = _Config.Topic.PartitionCount,
                            ReplicationFactor = _Config.Topic.ReplicationCount
                        }
                    }, null);

                    _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
                }
                catch (CreateTopicsException e)
                {
                    if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                    {
                        _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
                        throw e;
                    }
                    else
                    {
                        _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
                    }
                }
            }

            _Logger.LogInformation("Kafka Consumer thread started");

            await Task.CompletedTask;
        }


        /// <summary>
        /// Call base class dispose
        /// </summary>
        public override void Dispose()
        {
            base.Dispose();
        }
    }
}

对于live WebApp启动成功的原因,还是很疑惑。为什么这只是 TestServer 的问题?