在 docker-compose Stack on A Cloud CI 服务器(例如 GitLab 或 Travis)中,Confluent Kafka 客户端的正确用法是什么?

What Is The Correct Usage of Confluent Kafka Client Within docker-compose Stack On A Cloud CI Server Such As GitLab or Travis?

我是新的 Kafka 用户,并且已经设法让 docker-compose 堆栈在本地工作,以便从 ASP.NET Core 3.1 测试服务成功地 运行 进行功能测试。这存在于与 Kafa、Zookeeper 和 Rest-Proxy 服务相同的 docker-compose 堆栈中,位于同一网络上。

SUT 和测试使用 .NET Core Client 在启动时创建主题(如果主题尚不存在)。

只要我尝试 运行 这个 docker-compose 远程 GitLab.com CI 服务器上的堆栈,测试就会在创建主题时挂起。日志(见下文)显示 .NET 客户端正在连接到 docker-compose 堆栈中正确的内部服务 kafka:19092。 kafka 服务中有一些 activity 开始创建主题,然后它阻塞了。我应该会在日志中看到一条确认主题创建的消息。

.NET 客户端创建 Kafka 主题

        /// <summary>Dispatch request to Kafka Broker to create Kafka topic from config</summary>
        /// <param name="client">Kafka admin client</param>
        /// <exception cref="">Thrown for errors except topic already exists</exception>
        private async Task CreateTopicAsync(IAdminClient client)
        {
            try
            {
                _Logger.LogInformation("Admin service trying to create Kafka Topic...");
                _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
                _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");

                await client.CreateTopicsAsync(new TopicSpecification[] {
                        new TopicSpecification {
                            Name = _Config.Topic.Name,
                            NumPartitions = _Config.Topic.PartitionCount,
                            ReplicationFactor = _Config.Topic.ReplicationCount
                        }
                    }, null);

                _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
            }
            catch (CreateTopicsException e)
            {
                if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                {
                    _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
                    throw e;
                }
                else
                {
                    _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
                }
            }
        }

如何配置 kafka 代理和 rest 代理 docker-compose 服务到外部 CI 服务器上的 运行?这可能吗?

下面包含 docker-compose 堆栈和 GitLab CI 环境的详细信息...

创建docker网络

docker network create --gateway 172.19.0.1 --subnet 172.19.0.0/16 broker

docker-与 zookeeper、kafka、rest-proxy 和 ASP.NET Core 3.1 组成堆栈,用于集成测试服务

---
version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      camnet:
        ipv4_address: 172.19.0.11
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:6.0.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    networks:
      camnet:
        ipv4_address: 172.19.0.21
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092 
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_BROKER_ID: 1
      KAFKA_DEFAULT_REPLICATION_FACTOR : 1
      KAFKA_NUM_PARTITIONS: 3

  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.0.0
    hostname: rest-proxy
    container_name: rest-proxy
    depends_on:
      - kafka
    ports:
      - 8082:8082
    networks:
      camnet:
        ipv4_address: 172.19.0.31
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:19092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
    
 # ASP.NET Core 3 integration tests
 # uses kafka dotnet client https://docs.confluent.io/current/clients/dotnet.html
 # to create topics from an ASP.NET Core Test Server 
  webapp:
    build:
      context: ../
      dockerfile: Docker/Test/Dockerfile
      target: test
    hostname: webapp
    image: dcs3spp/webapp
    container_name: webapp
    depends_on:
      - kafka
      - rest-proxy
    networks:
      camnet:
        ipv4_address: 172.19.0.61
    environment:
      - ASPNETCORE_ENVIRONMENT=Docker

networks:
  camnet:
    external:
      name: broker

GitLab.com CI Docker 网络环境

$ docker network ls
NETWORK ID          NAME                DRIVER              SCOPE
2b3286d21fee        bridge              bridge              local
a17bf57d1a86        host                host                local
0252525b2ca4        none                null                local
$ docker network inspect bridge
[
    {
        "Name": "bridge",
        "Id": "2b3286d21fee076047e78188b67c2912dfd388a170de3e3cf2ba8d5238e1c6c7",
        "Created": "2020-11-16T14:53:35.574299006Z",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": [
                {
                    "Subnet": "172.18.0.0/16"
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Ingress": false,
        "ConfigFrom": {
            "Network": ""
        },
        "ConfigOnly": false,
        "Containers": {},
        "Options": {
            "com.docker.network.bridge.default_bridge": "true",
            "com.docker.network.bridge.enable_icc": "true",
            "com.docker.network.bridge.enable_ip_masquerade": "true",
            "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
            "com.docker.network.bridge.name": "docker0",
            "com.docker.network.driver.mtu": "1500"
        },
        "Labels": {}
    }
]
$ docker network inspect host
[
    {
        "Name": "host",
        "Id": "a17bf57d1a865512bebd3f7f73e0fd761d40b1d4f87765edeac6099e86b94339",
        "Created": "2020-11-16T14:53:35.551372286Z",
        "Scope": "local",
        "Driver": "host",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": []
        },
        "Internal": false,
        "Attachable": false,
        "Ingress": false,
        "ConfigFrom": {
            "Network": ""
        },
        "ConfigOnly": false,
        "Containers": {},
        "Options": {},
        "Labels": {}
    }
]
$ docker network inspect none
[
    {
        "Name": "none",
        "Id": "0252525b2ca4b28ddc0f950b472485167cfe18e003c62f3d09ce2a856880362a",
        "Created": "2020-11-16T14:53:35.536741983Z",
        "Scope": "local",
        "Driver": "null",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": null,
            "Config": []
        },
        "Internal": false,
        "Attachable": false,
        "Ingress": false,
        "ConfigFrom": {
            "Network": ""
        },
        "ConfigOnly": false,
        "Containers": {},
        "Options": {},
        "Labels": {}
    }
]
$ docker network create --gateway 172.19.0.1 --subnet 172.19.0.0/16 broker
dbd923b4caacca225f52e8a82dfcad184a1652bde1b5976aa07bbddb2919126c

Gitab.com CI 服务器日志

webapp        | A total of 1 test files matched the specified pattern.
webapp        | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
webapp        |       Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
webapp        | info: WebApp.S3.S3Service[0]
webapp        |       Minio client created for endpoint minio:9000
webapp        | info: WebApp.Kafka.ProducerService[0]
webapp        |       ProducerService constructor called
webapp        | info: WebApp.Kafka.SchemaRegistry.Serdes.JsonDeserializer[0]
webapp        |       Constructed
webapp        | info: WebApp.Kafka.ConsumerService[0]
webapp        |       Kafka consumer listening to camera topics =>
webapp        | info: WebApp.Kafka.ConsumerService[0]
webapp        |       Camera Topic :: shinobi/RHSsYfiV6Z/xi5cncrNK6/trigger
webapp        | info: WebApp.Kafka.ConsumerService[0]
webapp        |       Camera Topic :: shinobi/group/monitor/trigger
webapp        | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
webapp        |       No XML encryptor configured. Key {47af6978-c38e-429f-9b34-455ca445c2d8} may be persisted to storage in unencrypted form.
webapp        | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp        |       Admin service trying to create Kafka Topic...
webapp        | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp        |       Topic::eventbus, ReplicationCount::1, PartitionCount::3
webapp        | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp        |       Bootstrap Servers::kafka:19092
kafka         | [2020-11-16 14:59:32,335] INFO Creating topic eventbus with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka         | [2020-11-16 14:59:32,543] INFO [Controller id=1] New topics: [Set(eventbus)], deleted topics: [HashSet()], new partition replica assignment [Map(eventbus-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), eventbus-1 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), eventbus-2 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))] (kafka.controller.KafkaController)
kafka         | [2020-11-16 14:59:32,546] INFO [Controller id=1] New partition creation callback for eventbus-0,eventbus-1,eventbus-2 (kafka.controller.KafkaController)
kafka         | [2020-11-16 14:59:32,557] INFO [Controller id=1 epoch=1] Changed partition eventbus-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,558] INFO [Controller id=1 epoch=1] Changed partition eventbus-1 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,559] INFO [Controller id=1 epoch=1] Changed partition eventbus-2 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,560] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka         | [2020-11-16 14:59:32,613] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-0 from NonExistentReplica to NewReplica (state.change.logger)
kafka         | [2020-11-16 14:59:32,614] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-1 from NonExistentReplica to NewReplica (state.change.logger)
kafka         | [2020-11-16 14:59:32,615] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-2 from NonExistentReplica to NewReplica (state.change.logger)
kafka         | [2020-11-16 14:59:32,616] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka         | [2020-11-16 14:59:32,770] INFO [Controller id=1 epoch=1] Changed partition eventbus-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka         | [2020-11-16 14:59:32,772] INFO [Controller id=1 epoch=1] Changed partition eventbus-1 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka         | [2020-11-16 14:59:32,773] INFO [Controller id=1 epoch=1] Changed partition eventbus-2 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka         | [2020-11-16 14:59:32,804] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-0 (state.change.logger)
kafka         | [2020-11-16 14:59:32,805] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,806] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-2 (state.change.logger)
kafka         | [2020-11-16 14:59:32,808] INFO [Controller id=1 epoch=1] Sending LeaderAndIsr request to broker 1 with 3 become-leader and 0 become-follower partitions (state.change.logger)
kafka         | [2020-11-16 14:59:32,818] INFO [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 for 3 partitions (state.change.logger)
kafka         | [2020-11-16 14:59:32,820] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,821] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,822] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet(1) for 3 partitions (state.change.logger)
kafka         | [2020-11-16 14:59:32,823] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,828] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-0 from NewReplica to OnlineReplica (state.change.logger)
kafka         | [2020-11-16 14:59:32,829] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-1 from NewReplica to OnlineReplica (state.change.logger)
kafka         | [2020-11-16 14:59:32,830] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-2 from NewReplica to OnlineReplica (state.change.logger)
kafka         | [2020-11-16 14:59:32,832] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka         | [2020-11-16 14:59:32,869] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-2 (state.change.logger)
kafka         | [2020-11-16 14:59:32,870] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-1 (state.change.logger)
kafka         | [2020-11-16 14:59:32,871] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-0 (state.change.logger)
kafka         | [2020-11-16 14:59:32,873] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(eventbus-2, eventbus-1, eventbus-0) (kafka.server.ReplicaFetcherManager)
kafka         | [2020-11-16 14:59:32,874] INFO [Broker id=1] Stopped fetchers as part of LeaderAndIsr request correlationId 1 from controller 1 epoch 1 as part of the become-leader transition for 3 partitions (state.change.logger)
kafka         | [2020-11-16 14:59:33,345] INFO [Log partition=eventbus-2, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka         | [2020-11-16 14:59:33,421] INFO Created log for partition eventbus-2 in /var/lib/kafka/data/eventbus-2 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka         | [2020-11-16 14:59:33,424] INFO [Partition eventbus-2 broker=1] No checkpointed highwatermark is found for partition eventbus-2 (kafka.cluster.Partition)
kafka         | [2020-11-16 14:59:33,425] INFO [Partition eventbus-2 broker=1] Log loaded for partition eventbus-2 with initial high watermark 0 (kafka.cluster.Partition)
kafka         | [2020-11-16 14:59:33,429] INFO [Broker id=1] Leader eventbus-2 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka         | [2020-11-16 14:59:33,462] INFO [Log partition=eventbus-1, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka         | [2020-11-16 14:59:33,468] INFO Created log for partition eventbus-1 in /var/lib/kafka/data/eventbus-1 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka         | [2020-11-16 14:59:33,469] INFO [Partition eventbus-1 broker=1] No checkpointed highwatermark is found for partition eventbus-1 (kafka.cluster.Partition)
kafka         | [2020-11-16 14:59:33,470] INFO [Partition eventbus-1 broker=1] Log loaded for partition eventbus-1 with initial high watermark 0 (kafka.cluster.Partition)
kafka         | [2020-11-16 14:59:33,470] INFO [Broker id=1] Leader eventbus-1 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka         | [2020-11-16 14:59:33,484] INFO [Log partition=eventbus-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka         | [2020-11-16 14:59:33,488] INFO Created log for partition eventbus-0 in /var/lib/kafka/data/eventbus-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka         | [2020-11-16 14:59:33,489] INFO [Partition eventbus-0 broker=1] No checkpointed highwatermark is found for partition eventbus-0 (kafka.cluster.Partition)
kafka         | [2020-11-16 14:59:33,489] INFO [Partition eventbus-0 broker=1] Log loaded for partition eventbus-0 with initial high watermark 0 (kafka.cluster.Partition)
kafka         | [2020-11-16 14:59:33,490] INFO [Broker id=1] Leader eventbus-0 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka         | [2020-11-16 14:59:33,507] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-2 (state.change.logger)
kafka         | [2020-11-16 14:59:33,508] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-1 (state.change.logger)
kafka         | [2020-11-16 14:59:33,509] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-0 (state.change.logger)
kafka         | [2020-11-16 14:59:33,549] TRACE [Controller id=1 epoch=1] Received response {error_code=0,partition_errors=[{topic_name=eventbus,partition_index=0,error_code=0,_tagged_fields={}},{topic_name=eventbus,partition_index=1,error_code=0,_tagged_fields={}},{topic_name=eventbus,partition_index=2,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request LEADER_AND_ISR with correlation id 1 sent to broker kafka:19092 (id: 1 rack: null) (state.change.logger)
kafka         | [2020-11-16 14:59:33,564] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka         | [2020-11-16 14:59:33,569] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-1 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka         | [2020-11-16 14:59:33,570] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-2 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka         | [2020-11-16 14:59:33,574] INFO [Broker id=1] Add 3 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka         | [2020-11-16 14:59:33,576] TRACE [Controller id=1 epoch=1] Received response {error_code=0,_tagged_fields={}} for request UPDATE_METADATA with correlation id 2 sent to broker kafka:19092 (id: 1 rack: null) (state.change.logger)
kafka         | [2020-11-16 15:01:52,665] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:01:52,666] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:01:52,669] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:01:52,675] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:06:47,246] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka         | [2020-11-16 15:06:52,676] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:06:52,677] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:06:52,678] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
kafka         | [2020-11-16 15:06:52,679] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)

看完这篇aspnetcoreissue发现问题出在实现上 我的 IHostedService 向 Kafka 发出请求的实现。

StartAsync 方法正在执行任务,运行 直到请求完成。按照设计,此方法意味着即发即弃,即开始任务然后继续。将我的 KafkaAdmin 服务更新为 BackgroundService,覆盖 ExecuteAsync 方法,如下所列。 随后,测试不再阻塞。

using System;
using System.Threading;
using System.Threading.Tasks;

using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using KafkaAdmin.Kafka.Config;


namespace KafkaAdmin.Kafka
{
    public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);

    /// <summary>Background Service to make a request from Kafka to create a topic</summary>
    public class KafkaAdminService : BackgroundService, IDisposable
    {
        private KafkaAdminFactory _Factory { get; set; }
        private ILogger<KafkaAdminService> _Logger { get; set; }
        private KafkaConfig _Config { get; set; }


        /// <summary>
        /// Retrieve KafkaConfig from appsettings
        /// </summary>
        /// <param name="config">Config POCO from appsettings file</param>
        /// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
        /// <param name="logger">Logger instance</param>
        public KafkaAdminService(
            IOptions<KafkaConfig> config,
            KafkaAdminFactory clientFactory,
            ILogger<KafkaAdminService> logger)
        {
            if (clientFactory == null)
                throw new ArgumentNullException(nameof(clientFactory));

            if (config == null)
                throw new ArgumentNullException(nameof(config));

            _Config = config.Value ?? throw new ArgumentNullException(nameof(config));
            _Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
            _Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }


        /// <summary>
        /// Create a Kafka topic if it does not already exist
        /// </summary>
        /// <param name="token">Cancellation token required by IHostedService</param>
        /// <exception name="CreateTopicsException">
        /// Thrown for exceptions encountered except duplicate topic
        /// </exception>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using (var client = _Factory(_Config))
            {
                try
                {
                    _Logger.LogInformation("Admin service trying to create Kafka Topic...");
                    _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
                    _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");

                    await client.CreateTopicsAsync(new TopicSpecification[] {
                        new TopicSpecification {
                            Name = _Config.Topic.Name,
                            NumPartitions = _Config.Topic.PartitionCount,
                            ReplicationFactor = _Config.Topic.ReplicationCount
                        }
                    }, null);

                    _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
                }
                catch (CreateTopicsException e)
                {
                    if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                    {
                        _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
                        throw e;
                    }
                    else
                    {
                        _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
                    }
                }
            }

            _Logger.LogInformation("Kafka Consumer thread started");

            await Task.CompletedTask;
        }


        /// <summary>
        /// Call base class dispose
        /// </summary>
        public override void Dispose()
        {
            base.Dispose();
        }
    }
}

对于live WebApp启动成功的原因,还是很疑惑。为什么这只是 TestServer 的问题?

我创建了一个 repository 来演示问题和修复。 master 分支包含展示问题的源代码。 feat/fix 分支包含包含修复程序的源代码。

希望这对遇到类似问题的其他人有所帮助!