ASP.NET 核心 WebApplicationFactory CreateClient 引导服务器在 GitLab CI 上阻塞 运行 在 Docker Compose 环境中
ASP.NET Core WebApplicationFactory CreateClient Booting Server Blocking on GitLab CI When Run In Docker Compose Environment
我的 ASP.NET Core 3.1 API 有一个功能测试项目,它使用 WebApplicationFactory 为 API 创建一个测试服务器。测试在本地和本地 docker-compose 环境中通过。
然而,当 运行 在同一 docker-compose 环境中的 GitLab CI 服务器上时,WebApplication 工厂的 CreateClient
方法被阻塞。当测试服务器启动时,汇合的 Kafka 管理服务被阻塞,即没有确认日志消息显示确认创建的主题。我在 GitLab 上创建了一个小 project 来强调这个问题。
这个问题似乎与 WebApplication 测试服务器和 Confluent Kafka 不知何故有关,因为我已经创建了一个 docker-compose 堆栈,它在 GitLab CI 上启动了被测软件 WebApp 并且它启动了成功。
被测软件包含后台/托管服务:
- 用于创建主题的 Kafka 管理服务 - 当使用 WebAppicationFactory 测试服务器
时,这会在 CI 服务器上阻塞
- 卡夫卡消费者
- MqttKafkaBridge
它还使用 Autofac 并启动 SignalR Hub。
在 Gitlab 或 Travis 等远程 CI 服务器上使用 WebApplicationFactory
时,有没有人经历过类似的 issue/problems?
是否因为 WebApplicationFactory.CreateClient()
创建了一个 TestServer
运行 作为本地主机??
使用 WebApplicationFactory 进行测试
为被测软件创建 WebApplicationFactory 并在创建后显示控制台日志消息。当 运行 在 CI 服务器上时,在创建工厂客户端后没有控制台消息显示。
[Fact]
public void WebApp_ApiController_DownloadImage()
{
Console.WriteLine("TEST WebApp_ApiController_DownloadImage");
var appFactory = new WebApplicationFactory<WebApp.Startup>()
.WithWebHostBuilder(builder =>
{
});
/** THIS CODE HANGS WHILE Bootsrapping the Services in Startup **/
/** NO TEST MESSAGE IS DISPLAYED **/
using (var client = appFactory.CreateClient())
{
Console.WriteLine("WE ARE IN THE TEST HERE");
}
}
Startup.cs
包含 Kafka 和 Mqtt 的后台服务
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public ILifetimeScope AutofacContainer { get; private set; }
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public virtual void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddServerSideBlazor();
services
.AddCustomConfiguration(Configuration)
.AddBackgroundServices()
.AddLogging()
.AddCustomSignalR();
}
// ConfigureContainer is where you can register things directly
// with Autofac. This runs after ConfigureServices so the things
// here will override registrations made in ConfigureServices.
// Don't build the container; that gets done for you by the factory.
public virtual void ConfigureContainer(ContainerBuilder builder)
{
// Register your own things directly with Autofac here. Don't
// call builder.Populate(), that happens in AutofacServiceProviderFactory
// for you.
builder.RegisterModule(new MqttModule());
builder.RegisterModule(new MotionDetectionRepositoryModule());
builder.RegisterModule(new KafkaModule());
builder.RegisterAssemblyTypes(typeof(MotionDetection).GetTypeInfo().Assembly);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public virtual void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseExceptionHandler("/Error");
// The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts.
app.UseHsts();
}
// app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapHub<MotionHub>("/motionhub");
endpoints.MapBlazorHub();
endpoints.MapFallbackToPage("/_Host");
});
}
}
static class CustomExtensionsMethods
{
/// <summary>
/// Create background services to:
/// 1: Create Kafka topic from config if not already created
/// 2: Mqtt->Kafka Bridge for object detections
/// 3: Consume Kafka object detections and forward to signalR
/// </summary>
/// <param name="services">Service collection</param>
public static IServiceCollection AddBackgroundServices(this IServiceCollection services)
{
services.AddHostedService<KafkaAdminService>();
services.AddHostedService<MqttKafkaBridge>();
services.AddHostedService<ConsumerService>();
return services;
}
public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration)
{
if (!configuration.GetSection(S3Config.SectionName).Exists())
{
throw new InvalidOperationException($"Failed to locate section {S3Config.SectionName} in config file");
}
services.Configure<S3Config>(options => configuration.GetSection(S3Config.SectionName).Bind(options));
if (!configuration.GetSection(MqttConfig.SectionName).Exists())
{
throw new InvalidOperationException($"Failed to locate section {MqttConfig.SectionName} in config file");
}
services.Configure<MqttConfig>(options => configuration.GetSection(MqttConfig.SectionName).Bind(options));
if (!configuration.GetSection(KafkaConfig.SectionName).Exists())
{
throw new InvalidOperationException($"Failed to locate section {KafkaConfig.SectionName} in config file");
}
services.Configure<KafkaConfig>(options => configuration.GetSection(KafkaConfig.SectionName).Bind(options));
return services;
}
public static IServiceCollection AddCustomSignalR(this IServiceCollection services)
{
var sp = services.BuildServiceProvider();
var loggerMD = sp.GetService<ILogger<MotionDetectionConverter>>();
var loggerMI = sp.GetService<ILogger<MotionInfoConverter>>();
var loggerJV = sp.GetService<ILogger<JsonVisitor>>();
services.AddSignalR(o => o.EnableDetailedErrors = true)
.AddJsonProtocol(options =>
{
options.PayloadSerializerOptions = JsonConvertersFactory.CreateDefaultJsonConverters(loggerMD, loggerMI, loggerJV);
});
return services;
}
}
向 Kafka 发送创建主题请求的后台服务 - 使用 WebApplicationFactory 测试服务器时阻塞
public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
public class KafkaAdminService : IHostedService
{
private KafkaAdminFactory _Factory { get; set; }
private ILogger<KafkaAdminService> _Logger { get; set; }
private KafkaConfig _Config { get; set; }
/// <summary>
/// Retrieve KafkaConfig from appsettings
/// </summary>
/// <param name="config">Config POCO from appsettings file</param>
/// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
/// <param name="logger">Logger instance</param>
public KafkaAdminService(
IOptions<KafkaConfig> config,
KafkaAdminFactory clientFactory,
ILogger<KafkaAdminService> logger)
{
if (clientFactory == null)
throw new ArgumentNullException(nameof(clientFactory));
if (config == null)
throw new ArgumentNullException(nameof(config));
_Config = config.Value ?? throw new ArgumentNullException(nameof(config));
_Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a Kafka topic if it does not already exist
/// </summary>
/// <param name="token">Cancellation token required by IHostedService</param>
/// <exception name="CreateTopicsException">
/// Thrown for exceptions encountered except duplicate topic
/// </exception>
public async Task StartAsync(CancellationToken token)
{
using (var client = _Factory(_Config))
{
await CreateTopicAsync(client);
}
}
/// <summary>Dispatch request to Kafka Broker to create Kafka topic from config</summary>
/// <param name="client">Kafka admin client</param>
/// <exception cref="">Thrown for errors except topic already exists</exception>
private async Task CreateTopicAsync(IAdminClient client)
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
/// <summary>No-op</summary>
/// <param name="token">Cancellation token</param>
public async Task StopAsync(CancellationToken token) => await Task.CompletedTask;
}
GitLab 管道运行宁同docker-远程组合堆栈
为管道变量创建 .env 文件并启动 docker-compose stack
stages:
- build
- test
- release
variables:
DOCKER_DRIVER: overlay2
services:
- docker:19.03.11-dind
test:
image: docker/compose:debian-1.27.4
stage: test
variables:
DOCKER_BUILDKIT: 1
COMPOSE_DOCKER_CLI_BUILD: 1
before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_JOB_TOKEN $CI_REGISTRY
script:
- cd Docker
- echo "MQTT_USER=${MQTT_USER}" >> .env
- echo "MQTT_PASSWORD=${MQTT_PASSWORD}" >> .env
- echo "MINIO_USER=${MINIO_USER}" >> .env
- echo "MINIO_PASSWORD=${MINIO_PASSWORD}" >> .env
- docker-compose -f docker-compose-ci.yml build webapp
- docker-compose -f docker-compose-ci.yml up --exit-code-from webapp --abort-on-container-exit
GitLab CI 作业输出
已通过 CI
上的调试读取并显示正确的测试环境变量
netclient-run | .NET Run Web App Ready. Starting WebApp that contains KafkaAdmin background service.
netclient-test | Giving netclient-run a bit of time to start up…
netclient-run | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
netclient-run | Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
netclient-run | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
netclient-run | No XML encryptor configured. Key {395ba0f4-cde9-49af-8fb4-fd16b9f05bae} may be persisted to storage in unencrypted form.
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Admin service trying to create Kafka Topic...
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Topic::eventbus, ReplicationCount::1, PartitionCount::3
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Bootstrap Servers::kafka:9092
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Admin service successfully created topic eventbus
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Now listening on: http://[::]:80
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Application started. Press Ctrl+C to shut down.
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Hosting environment: Docker
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Content root path: /KafkaAdmin/src/KafkaAdmin.WebApp
netclient-test | .NET Client test container ready. Running test that uses WebApplicationFactory TestServer to start WebApp with KafkaAdmin background service
netclient-test | This runs successfully in a local development environment on MacOS and Ubuntu Linux 16.04.
netclient-test | This fails when running on a GitLab CI Server. It can be seen that the test server bootstraps the WebApp.....
netclient-test | The KafkaAdmin background service blocks when requesting topic creation from the kafka service
netclient-test | Test run for /KafkaAdmin/tests/KafkaAdmin.Kafka.IntegrationTests/bin/Release/netcoreapp3.1/linux-musl-x64/KafkaAdmin.Kafka.IntegrationTests.dll(.NETCoreApp,Version=v3.1)
netclient-test | Starting test execution, please wait...
netclient-test |
netclient-test | A total of 1 test files matched the specified pattern.
netclient-test | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
netclient-test | Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
netclient-test | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
netclient-test | No XML encryptor configured. Key {2b234f03-01b4-472d-9621-db8e056db173} may be persisted to storage in unencrypted form.
netclient-test | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test | Admin service trying to create Kafka Topic...
netclient-test | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test | Topic::eventbus, ReplicationCount::1, PartitionCount::3
netclient-test | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test | Bootstrap Servers::kafka:9092
docker-撰写
在同一网络中包含 Kafka、Zookeeper 和 WebApp(源+测试)服务。如果将 WebApp 服务器的命令更改为 运行 被测软件 WebApp,则 运行 在 CI 上成功。只有在远程 GitLan CI 服务器上使用 WebApplicationFactory 测试服务器时才会遇到此问题。
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- camnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN
kafka:
image: confluentinc/cp-kafka:6.0.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
networks:
- camnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
KAFKA_HEAP_OPTS: -Xmx512M -Xms512M
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"
mqtt:
image: eclipse-mosquitto:1.6.9
hostname: mqtt
container_name: mqtt
ports:
- "8883:8883"
- "1883:1883"
- "9901:9001"
networks:
- camnet
environment:
- MOSQUITTO_USERNAME=${MQTT_USER}
- MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
volumes:
- ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
- ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
- ./Mqtt/Certs/server.key:/mosquitto/config/server.key
minio:
image: dcs3spp/minio:version-1.0.2
hostname: minio
container_name: minio
ports:
- "9000:9000"
networks:
- camnet
environment:
- MINIO_BUCKET=images
- MINIO_ACCESS_KEY=${MINIO_USER}
- MINIO_SECRET_KEY=${MINIO_PASSWORD}
webapp:
build:
context: ../
dockerfile: Docker/Test/Dockerfile.debian
target: test
hostname: webapp
container_name: webapp
image: dcs3spp/webapp
depends_on:
- kafka
- minio
- mqtt
networks:
- camnet
entrypoint: []
command: >
/bin/sh -c "
echo Waiting for kafka service start...;
while ! nc -z kafka 9092;
do
sleep 1;
done;
echo Connected!;
dotnet test ./Tests/FunctionalTests/WebApp.FunctionalTests;
"
environment:
- ASPNETCORE_ENVIRONMENT=Docker
- ASPNETCORE_URLS=http://+:80
- MqttSettings__UserName=${MQTT_USER}
- MqttSettings__Password=${MQTT_PASSWORD}
- S3Settings__AccessKey=${MINIO_USER}
- S3Settings__SecretKey=${MINIO_PASSWORD}
volumes:
- ../CoverageReports:/CoverageReports
networks:
camnet:
看完这篇 aspnetcore issue 发现问题出在实现上
我的 IHostedService
实现。
StartAsync
方法正在执行任务,运行 直到请求完成。按照设计,此方法意味着即发即弃,即开始任务然后继续。将我的 KafkaAdmin
服务更新为 BackgroundService
,覆盖 ExecuteAsync
方法,如下所列。
随后,测试不再阻塞。
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using KafkaAdmin.Kafka.Config;
namespace KafkaAdmin.Kafka
{
public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
/// <summary>Background Service to make a request from Kafka to create a topic</summary>
public class KafkaAdminService : BackgroundService, IDisposable
{
private KafkaAdminFactory _Factory { get; set; }
private ILogger<KafkaAdminService> _Logger { get; set; }
private KafkaConfig _Config { get; set; }
/// <summary>
/// Retrieve KafkaConfig from appsettings
/// </summary>
/// <param name="config">Config POCO from appsettings file</param>
/// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
/// <param name="logger">Logger instance</param>
public KafkaAdminService(
IOptions<KafkaConfig> config,
KafkaAdminFactory clientFactory,
ILogger<KafkaAdminService> logger)
{
if (clientFactory == null)
throw new ArgumentNullException(nameof(clientFactory));
if (config == null)
throw new ArgumentNullException(nameof(config));
_Config = config.Value ?? throw new ArgumentNullException(nameof(config));
_Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a Kafka topic if it does not already exist
/// </summary>
/// <param name="token">Cancellation token required by IHostedService</param>
/// <exception name="CreateTopicsException">
/// Thrown for exceptions encountered except duplicate topic
/// </exception>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var client = _Factory(_Config))
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
_Logger.LogInformation("Kafka Consumer thread started");
await Task.CompletedTask;
}
/// <summary>
/// Call base class dispose
/// </summary>
public override void Dispose()
{
base.Dispose();
}
}
}
对于live WebApp启动成功的原因,还是很疑惑。为什么这只是 TestServer 的问题?
我的 ASP.NET Core 3.1 API 有一个功能测试项目,它使用 WebApplicationFactory 为 API 创建一个测试服务器。测试在本地和本地 docker-compose 环境中通过。
然而,当 运行 在同一 docker-compose 环境中的 GitLab CI 服务器上时,WebApplication 工厂的 CreateClient
方法被阻塞。当测试服务器启动时,汇合的 Kafka 管理服务被阻塞,即没有确认日志消息显示确认创建的主题。我在 GitLab 上创建了一个小 project 来强调这个问题。
这个问题似乎与 WebApplication 测试服务器和 Confluent Kafka 不知何故有关,因为我已经创建了一个 docker-compose 堆栈,它在 GitLab CI 上启动了被测软件 WebApp 并且它启动了成功。
被测软件包含后台/托管服务:
- 用于创建主题的 Kafka 管理服务 - 当使用 WebAppicationFactory 测试服务器 时,这会在 CI 服务器上阻塞
- 卡夫卡消费者
- MqttKafkaBridge
它还使用 Autofac 并启动 SignalR Hub。
在 Gitlab 或 Travis 等远程 CI 服务器上使用 WebApplicationFactory
时,有没有人经历过类似的 issue/problems?
是否因为 WebApplicationFactory.CreateClient()
创建了一个 TestServer
运行 作为本地主机??
使用 WebApplicationFactory 进行测试
为被测软件创建 WebApplicationFactory 并在创建后显示控制台日志消息。当 运行 在 CI 服务器上时,在创建工厂客户端后没有控制台消息显示。
[Fact]
public void WebApp_ApiController_DownloadImage()
{
Console.WriteLine("TEST WebApp_ApiController_DownloadImage");
var appFactory = new WebApplicationFactory<WebApp.Startup>()
.WithWebHostBuilder(builder =>
{
});
/** THIS CODE HANGS WHILE Bootsrapping the Services in Startup **/
/** NO TEST MESSAGE IS DISPLAYED **/
using (var client = appFactory.CreateClient())
{
Console.WriteLine("WE ARE IN THE TEST HERE");
}
}
Startup.cs
包含 Kafka 和 Mqtt 的后台服务
public class Startup
{
public Startup(IConfiguration configuration)
{
Configuration = configuration;
}
public IConfiguration Configuration { get; }
public ILifetimeScope AutofacContainer { get; private set; }
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public virtual void ConfigureServices(IServiceCollection services)
{
services.AddRazorPages();
services.AddServerSideBlazor();
services
.AddCustomConfiguration(Configuration)
.AddBackgroundServices()
.AddLogging()
.AddCustomSignalR();
}
// ConfigureContainer is where you can register things directly
// with Autofac. This runs after ConfigureServices so the things
// here will override registrations made in ConfigureServices.
// Don't build the container; that gets done for you by the factory.
public virtual void ConfigureContainer(ContainerBuilder builder)
{
// Register your own things directly with Autofac here. Don't
// call builder.Populate(), that happens in AutofacServiceProviderFactory
// for you.
builder.RegisterModule(new MqttModule());
builder.RegisterModule(new MotionDetectionRepositoryModule());
builder.RegisterModule(new KafkaModule());
builder.RegisterAssemblyTypes(typeof(MotionDetection).GetTypeInfo().Assembly);
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public virtual void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
else
{
app.UseExceptionHandler("/Error");
// The default HSTS value is 30 days. You may want to change this for production scenarios, see https://aka.ms/aspnetcore-hsts.
app.UseHsts();
}
// app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
endpoints.MapHub<MotionHub>("/motionhub");
endpoints.MapBlazorHub();
endpoints.MapFallbackToPage("/_Host");
});
}
}
static class CustomExtensionsMethods
{
/// <summary>
/// Create background services to:
/// 1: Create Kafka topic from config if not already created
/// 2: Mqtt->Kafka Bridge for object detections
/// 3: Consume Kafka object detections and forward to signalR
/// </summary>
/// <param name="services">Service collection</param>
public static IServiceCollection AddBackgroundServices(this IServiceCollection services)
{
services.AddHostedService<KafkaAdminService>();
services.AddHostedService<MqttKafkaBridge>();
services.AddHostedService<ConsumerService>();
return services;
}
public static IServiceCollection AddCustomConfiguration(this IServiceCollection services, IConfiguration configuration)
{
if (!configuration.GetSection(S3Config.SectionName).Exists())
{
throw new InvalidOperationException($"Failed to locate section {S3Config.SectionName} in config file");
}
services.Configure<S3Config>(options => configuration.GetSection(S3Config.SectionName).Bind(options));
if (!configuration.GetSection(MqttConfig.SectionName).Exists())
{
throw new InvalidOperationException($"Failed to locate section {MqttConfig.SectionName} in config file");
}
services.Configure<MqttConfig>(options => configuration.GetSection(MqttConfig.SectionName).Bind(options));
if (!configuration.GetSection(KafkaConfig.SectionName).Exists())
{
throw new InvalidOperationException($"Failed to locate section {KafkaConfig.SectionName} in config file");
}
services.Configure<KafkaConfig>(options => configuration.GetSection(KafkaConfig.SectionName).Bind(options));
return services;
}
public static IServiceCollection AddCustomSignalR(this IServiceCollection services)
{
var sp = services.BuildServiceProvider();
var loggerMD = sp.GetService<ILogger<MotionDetectionConverter>>();
var loggerMI = sp.GetService<ILogger<MotionInfoConverter>>();
var loggerJV = sp.GetService<ILogger<JsonVisitor>>();
services.AddSignalR(o => o.EnableDetailedErrors = true)
.AddJsonProtocol(options =>
{
options.PayloadSerializerOptions = JsonConvertersFactory.CreateDefaultJsonConverters(loggerMD, loggerMI, loggerJV);
});
return services;
}
}
向 Kafka 发送创建主题请求的后台服务 - 使用 WebApplicationFactory 测试服务器时阻塞
public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
public class KafkaAdminService : IHostedService
{
private KafkaAdminFactory _Factory { get; set; }
private ILogger<KafkaAdminService> _Logger { get; set; }
private KafkaConfig _Config { get; set; }
/// <summary>
/// Retrieve KafkaConfig from appsettings
/// </summary>
/// <param name="config">Config POCO from appsettings file</param>
/// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
/// <param name="logger">Logger instance</param>
public KafkaAdminService(
IOptions<KafkaConfig> config,
KafkaAdminFactory clientFactory,
ILogger<KafkaAdminService> logger)
{
if (clientFactory == null)
throw new ArgumentNullException(nameof(clientFactory));
if (config == null)
throw new ArgumentNullException(nameof(config));
_Config = config.Value ?? throw new ArgumentNullException(nameof(config));
_Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a Kafka topic if it does not already exist
/// </summary>
/// <param name="token">Cancellation token required by IHostedService</param>
/// <exception name="CreateTopicsException">
/// Thrown for exceptions encountered except duplicate topic
/// </exception>
public async Task StartAsync(CancellationToken token)
{
using (var client = _Factory(_Config))
{
await CreateTopicAsync(client);
}
}
/// <summary>Dispatch request to Kafka Broker to create Kafka topic from config</summary>
/// <param name="client">Kafka admin client</param>
/// <exception cref="">Thrown for errors except topic already exists</exception>
private async Task CreateTopicAsync(IAdminClient client)
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
/// <summary>No-op</summary>
/// <param name="token">Cancellation token</param>
public async Task StopAsync(CancellationToken token) => await Task.CompletedTask;
}
GitLab 管道运行宁同docker-远程组合堆栈
为管道变量创建 .env 文件并启动 docker-compose stack
stages:
- build
- test
- release
variables:
DOCKER_DRIVER: overlay2
services:
- docker:19.03.11-dind
test:
image: docker/compose:debian-1.27.4
stage: test
variables:
DOCKER_BUILDKIT: 1
COMPOSE_DOCKER_CLI_BUILD: 1
before_script:
- docker login -u $CI_REGISTRY_USER -p $CI_JOB_TOKEN $CI_REGISTRY
script:
- cd Docker
- echo "MQTT_USER=${MQTT_USER}" >> .env
- echo "MQTT_PASSWORD=${MQTT_PASSWORD}" >> .env
- echo "MINIO_USER=${MINIO_USER}" >> .env
- echo "MINIO_PASSWORD=${MINIO_PASSWORD}" >> .env
- docker-compose -f docker-compose-ci.yml build webapp
- docker-compose -f docker-compose-ci.yml up --exit-code-from webapp --abort-on-container-exit
GitLab CI 作业输出
已通过 CI
上的调试读取并显示正确的测试环境变量netclient-run | .NET Run Web App Ready. Starting WebApp that contains KafkaAdmin background service.
netclient-test | Giving netclient-run a bit of time to start up…
netclient-run | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
netclient-run | Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
netclient-run | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
netclient-run | No XML encryptor configured. Key {395ba0f4-cde9-49af-8fb4-fd16b9f05bae} may be persisted to storage in unencrypted form.
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Admin service trying to create Kafka Topic...
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Topic::eventbus, ReplicationCount::1, PartitionCount::3
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Bootstrap Servers::kafka:9092
netclient-run | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-run | Admin service successfully created topic eventbus
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Now listening on: http://[::]:80
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Application started. Press Ctrl+C to shut down.
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Hosting environment: Docker
netclient-run | info: Microsoft.Hosting.Lifetime[0]
netclient-run | Content root path: /KafkaAdmin/src/KafkaAdmin.WebApp
netclient-test | .NET Client test container ready. Running test that uses WebApplicationFactory TestServer to start WebApp with KafkaAdmin background service
netclient-test | This runs successfully in a local development environment on MacOS and Ubuntu Linux 16.04.
netclient-test | This fails when running on a GitLab CI Server. It can be seen that the test server bootstraps the WebApp.....
netclient-test | The KafkaAdmin background service blocks when requesting topic creation from the kafka service
netclient-test | Test run for /KafkaAdmin/tests/KafkaAdmin.Kafka.IntegrationTests/bin/Release/netcoreapp3.1/linux-musl-x64/KafkaAdmin.Kafka.IntegrationTests.dll(.NETCoreApp,Version=v3.1)
netclient-test | Starting test execution, please wait...
netclient-test |
netclient-test | A total of 1 test files matched the specified pattern.
netclient-test | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
netclient-test | Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
netclient-test | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
netclient-test | No XML encryptor configured. Key {2b234f03-01b4-472d-9621-db8e056db173} may be persisted to storage in unencrypted form.
netclient-test | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test | Admin service trying to create Kafka Topic...
netclient-test | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test | Topic::eventbus, ReplicationCount::1, PartitionCount::3
netclient-test | info: KafkaAdmin.Kafka.KafkaAdminService[0]
netclient-test | Bootstrap Servers::kafka:9092
docker-撰写
在同一网络中包含 Kafka、Zookeeper 和 WebApp(源+测试)服务。如果将 WebApp 服务器的命令更改为 运行 被测软件 WebApp,则 运行 在 CI 上成功。只有在远程 GitLan CI 服务器上使用 WebApplicationFactory 测试服务器时才会遇到此问题。
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
- camnet
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN
kafka:
image: confluentinc/cp-kafka:6.0.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
networks:
- camnet
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_NUM_PARTITIONS: 3
KAFKA_HEAP_OPTS: -Xmx512M -Xms512M
KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"
mqtt:
image: eclipse-mosquitto:1.6.9
hostname: mqtt
container_name: mqtt
ports:
- "8883:8883"
- "1883:1883"
- "9901:9001"
networks:
- camnet
environment:
- MOSQUITTO_USERNAME=${MQTT_USER}
- MOSQUITTO_PASSWORD=${MQTT_PASSWORD}
volumes:
- ./Mqtt/Config/mosquitto.conf:/mosquitto/config/mosquitto.conf
- ./Mqtt/Certs/localCA.crt:/mosquitto/config/ca.crt
- ./Mqtt/Certs/server.crt:/mosquitto/config/server.crt
- ./Mqtt/Certs/server.key:/mosquitto/config/server.key
minio:
image: dcs3spp/minio:version-1.0.2
hostname: minio
container_name: minio
ports:
- "9000:9000"
networks:
- camnet
environment:
- MINIO_BUCKET=images
- MINIO_ACCESS_KEY=${MINIO_USER}
- MINIO_SECRET_KEY=${MINIO_PASSWORD}
webapp:
build:
context: ../
dockerfile: Docker/Test/Dockerfile.debian
target: test
hostname: webapp
container_name: webapp
image: dcs3spp/webapp
depends_on:
- kafka
- minio
- mqtt
networks:
- camnet
entrypoint: []
command: >
/bin/sh -c "
echo Waiting for kafka service start...;
while ! nc -z kafka 9092;
do
sleep 1;
done;
echo Connected!;
dotnet test ./Tests/FunctionalTests/WebApp.FunctionalTests;
"
environment:
- ASPNETCORE_ENVIRONMENT=Docker
- ASPNETCORE_URLS=http://+:80
- MqttSettings__UserName=${MQTT_USER}
- MqttSettings__Password=${MQTT_PASSWORD}
- S3Settings__AccessKey=${MINIO_USER}
- S3Settings__SecretKey=${MINIO_PASSWORD}
volumes:
- ../CoverageReports:/CoverageReports
networks:
camnet:
看完这篇 aspnetcore issue 发现问题出在实现上
我的 IHostedService
实现。
StartAsync
方法正在执行任务,运行 直到请求完成。按照设计,此方法意味着即发即弃,即开始任务然后继续。将我的 KafkaAdmin
服务更新为 BackgroundService
,覆盖 ExecuteAsync
方法,如下所列。
随后,测试不再阻塞。
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using KafkaAdmin.Kafka.Config;
namespace KafkaAdmin.Kafka
{
public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
/// <summary>Background Service to make a request from Kafka to create a topic</summary>
public class KafkaAdminService : BackgroundService, IDisposable
{
private KafkaAdminFactory _Factory { get; set; }
private ILogger<KafkaAdminService> _Logger { get; set; }
private KafkaConfig _Config { get; set; }
/// <summary>
/// Retrieve KafkaConfig from appsettings
/// </summary>
/// <param name="config">Config POCO from appsettings file</param>
/// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
/// <param name="logger">Logger instance</param>
public KafkaAdminService(
IOptions<KafkaConfig> config,
KafkaAdminFactory clientFactory,
ILogger<KafkaAdminService> logger)
{
if (clientFactory == null)
throw new ArgumentNullException(nameof(clientFactory));
if (config == null)
throw new ArgumentNullException(nameof(config));
_Config = config.Value ?? throw new ArgumentNullException(nameof(config));
_Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a Kafka topic if it does not already exist
/// </summary>
/// <param name="token">Cancellation token required by IHostedService</param>
/// <exception name="CreateTopicsException">
/// Thrown for exceptions encountered except duplicate topic
/// </exception>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var client = _Factory(_Config))
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
_Logger.LogInformation("Kafka Consumer thread started");
await Task.CompletedTask;
}
/// <summary>
/// Call base class dispose
/// </summary>
public override void Dispose()
{
base.Dispose();
}
}
}
对于live WebApp启动成功的原因,还是很疑惑。为什么这只是 TestServer 的问题?