在 docker-compose Stack on A Cloud CI 服务器(例如 GitLab 或 Travis)中,Confluent Kafka 客户端的正确用法是什么?
What Is The Correct Usage of Confluent Kafka Client Within docker-compose Stack On A Cloud CI Server Such As GitLab or Travis?
我是新的 Kafka 用户,并且已经设法让 docker-compose 堆栈在本地工作,以便从 ASP.NET Core 3.1 测试服务成功地 运行 进行功能测试。这存在于与 Kafa、Zookeeper 和 Rest-Proxy 服务相同的 docker-compose 堆栈中,位于同一网络上。
SUT 和测试使用 .NET Core Client 在启动时创建主题(如果主题尚不存在)。
只要我尝试 运行 这个 docker-compose 远程 GitLab.com CI 服务器上的堆栈,测试就会在创建主题时挂起。日志(见下文)显示 .NET 客户端正在连接到 docker-compose 堆栈中正确的内部服务 kafka:19092
。 kafka 服务中有一些 activity 开始创建主题,然后它阻塞了。我应该会在日志中看到一条确认主题创建的消息。
.NET 客户端创建 Kafka 主题
/// <summary>Dispatch request to Kafka Broker to create Kafka topic from config</summary>
/// <param name="client">Kafka admin client</param>
/// <exception cref="">Thrown for errors except topic already exists</exception>
private async Task CreateTopicAsync(IAdminClient client)
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
如何配置 kafka 代理和 rest 代理 docker-compose 服务到外部 CI 服务器上的 运行?这可能吗?
下面包含 docker-compose 堆栈和 GitLab CI 环境的详细信息...
创建docker网络
docker network create --gateway 172.19.0.1 --subnet 172.19.0.0/16 broker
docker-与 zookeeper、kafka、rest-proxy 和 ASP.NET Core 3.1 组成堆栈,用于集成测试服务
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
camnet:
ipv4_address: 172.19.0.11
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.0.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
networks:
camnet:
ipv4_address: 172.19.0.21
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR : 1
KAFKA_NUM_PARTITIONS: 3
rest-proxy:
image: confluentinc/cp-kafka-rest:6.0.0
hostname: rest-proxy
container_name: rest-proxy
depends_on:
- kafka
ports:
- 8082:8082
networks:
camnet:
ipv4_address: 172.19.0.31
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:19092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
# ASP.NET Core 3 integration tests
# uses kafka dotnet client https://docs.confluent.io/current/clients/dotnet.html
# to create topics from an ASP.NET Core Test Server
webapp:
build:
context: ../
dockerfile: Docker/Test/Dockerfile
target: test
hostname: webapp
image: dcs3spp/webapp
container_name: webapp
depends_on:
- kafka
- rest-proxy
networks:
camnet:
ipv4_address: 172.19.0.61
environment:
- ASPNETCORE_ENVIRONMENT=Docker
networks:
camnet:
external:
name: broker
GitLab.com CI Docker 网络环境
$ docker network ls
NETWORK ID NAME DRIVER SCOPE
2b3286d21fee bridge bridge local
a17bf57d1a86 host host local
0252525b2ca4 none null local
$ docker network inspect bridge
[
{
"Name": "bridge",
"Id": "2b3286d21fee076047e78188b67c2912dfd388a170de3e3cf2ba8d5238e1c6c7",
"Created": "2020-11-16T14:53:35.574299006Z",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": [
{
"Subnet": "172.18.0.0/16"
}
]
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {
"com.docker.network.bridge.default_bridge": "true",
"com.docker.network.bridge.enable_icc": "true",
"com.docker.network.bridge.enable_ip_masquerade": "true",
"com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
"com.docker.network.bridge.name": "docker0",
"com.docker.network.driver.mtu": "1500"
},
"Labels": {}
}
]
$ docker network inspect host
[
{
"Name": "host",
"Id": "a17bf57d1a865512bebd3f7f73e0fd761d40b1d4f87765edeac6099e86b94339",
"Created": "2020-11-16T14:53:35.551372286Z",
"Scope": "local",
"Driver": "host",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": []
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {},
"Labels": {}
}
]
$ docker network inspect none
[
{
"Name": "none",
"Id": "0252525b2ca4b28ddc0f950b472485167cfe18e003c62f3d09ce2a856880362a",
"Created": "2020-11-16T14:53:35.536741983Z",
"Scope": "local",
"Driver": "null",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": []
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {},
"Labels": {}
}
]
$ docker network create --gateway 172.19.0.1 --subnet 172.19.0.0/16 broker
dbd923b4caacca225f52e8a82dfcad184a1652bde1b5976aa07bbddb2919126c
Gitab.com CI 服务器日志
webapp | A total of 1 test files matched the specified pattern.
webapp | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
webapp | Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
webapp | info: WebApp.S3.S3Service[0]
webapp | Minio client created for endpoint minio:9000
webapp | info: WebApp.Kafka.ProducerService[0]
webapp | ProducerService constructor called
webapp | info: WebApp.Kafka.SchemaRegistry.Serdes.JsonDeserializer[0]
webapp | Constructed
webapp | info: WebApp.Kafka.ConsumerService[0]
webapp | Kafka consumer listening to camera topics =>
webapp | info: WebApp.Kafka.ConsumerService[0]
webapp | Camera Topic :: shinobi/RHSsYfiV6Z/xi5cncrNK6/trigger
webapp | info: WebApp.Kafka.ConsumerService[0]
webapp | Camera Topic :: shinobi/group/monitor/trigger
webapp | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
webapp | No XML encryptor configured. Key {47af6978-c38e-429f-9b34-455ca445c2d8} may be persisted to storage in unencrypted form.
webapp | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp | Admin service trying to create Kafka Topic...
webapp | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp | Topic::eventbus, ReplicationCount::1, PartitionCount::3
webapp | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp | Bootstrap Servers::kafka:19092
kafka | [2020-11-16 14:59:32,335] INFO Creating topic eventbus with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka | [2020-11-16 14:59:32,543] INFO [Controller id=1] New topics: [Set(eventbus)], deleted topics: [HashSet()], new partition replica assignment [Map(eventbus-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), eventbus-1 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), eventbus-2 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))] (kafka.controller.KafkaController)
kafka | [2020-11-16 14:59:32,546] INFO [Controller id=1] New partition creation callback for eventbus-0,eventbus-1,eventbus-2 (kafka.controller.KafkaController)
kafka | [2020-11-16 14:59:32,557] INFO [Controller id=1 epoch=1] Changed partition eventbus-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,558] INFO [Controller id=1 epoch=1] Changed partition eventbus-1 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,559] INFO [Controller id=1 epoch=1] Changed partition eventbus-2 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,560] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,613] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-0 from NonExistentReplica to NewReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,614] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-1 from NonExistentReplica to NewReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,615] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-2 from NonExistentReplica to NewReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,616] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,770] INFO [Controller id=1 epoch=1] Changed partition eventbus-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka | [2020-11-16 14:59:32,772] INFO [Controller id=1 epoch=1] Changed partition eventbus-1 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka | [2020-11-16 14:59:32,773] INFO [Controller id=1 epoch=1] Changed partition eventbus-2 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka | [2020-11-16 14:59:32,804] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-0 (state.change.logger)
kafka | [2020-11-16 14:59:32,805] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-1 (state.change.logger)
kafka | [2020-11-16 14:59:32,806] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-2 (state.change.logger)
kafka | [2020-11-16 14:59:32,808] INFO [Controller id=1 epoch=1] Sending LeaderAndIsr request to broker 1 with 3 become-leader and 0 become-follower partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,818] INFO [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 for 3 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,820] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,821] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,822] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet(1) for 3 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,823] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,828] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-0 from NewReplica to OnlineReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,829] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-1 from NewReplica to OnlineReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,830] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-2 from NewReplica to OnlineReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,832] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,869] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-2 (state.change.logger)
kafka | [2020-11-16 14:59:32,870] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-1 (state.change.logger)
kafka | [2020-11-16 14:59:32,871] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-0 (state.change.logger)
kafka | [2020-11-16 14:59:32,873] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(eventbus-2, eventbus-1, eventbus-0) (kafka.server.ReplicaFetcherManager)
kafka | [2020-11-16 14:59:32,874] INFO [Broker id=1] Stopped fetchers as part of LeaderAndIsr request correlationId 1 from controller 1 epoch 1 as part of the become-leader transition for 3 partitions (state.change.logger)
kafka | [2020-11-16 14:59:33,345] INFO [Log partition=eventbus-2, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2020-11-16 14:59:33,421] INFO Created log for partition eventbus-2 in /var/lib/kafka/data/eventbus-2 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka | [2020-11-16 14:59:33,424] INFO [Partition eventbus-2 broker=1] No checkpointed highwatermark is found for partition eventbus-2 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,425] INFO [Partition eventbus-2 broker=1] Log loaded for partition eventbus-2 with initial high watermark 0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,429] INFO [Broker id=1] Leader eventbus-2 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka | [2020-11-16 14:59:33,462] INFO [Log partition=eventbus-1, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2020-11-16 14:59:33,468] INFO Created log for partition eventbus-1 in /var/lib/kafka/data/eventbus-1 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka | [2020-11-16 14:59:33,469] INFO [Partition eventbus-1 broker=1] No checkpointed highwatermark is found for partition eventbus-1 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,470] INFO [Partition eventbus-1 broker=1] Log loaded for partition eventbus-1 with initial high watermark 0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,470] INFO [Broker id=1] Leader eventbus-1 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka | [2020-11-16 14:59:33,484] INFO [Log partition=eventbus-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2020-11-16 14:59:33,488] INFO Created log for partition eventbus-0 in /var/lib/kafka/data/eventbus-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka | [2020-11-16 14:59:33,489] INFO [Partition eventbus-0 broker=1] No checkpointed highwatermark is found for partition eventbus-0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,489] INFO [Partition eventbus-0 broker=1] Log loaded for partition eventbus-0 with initial high watermark 0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,490] INFO [Broker id=1] Leader eventbus-0 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka | [2020-11-16 14:59:33,507] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-2 (state.change.logger)
kafka | [2020-11-16 14:59:33,508] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-1 (state.change.logger)
kafka | [2020-11-16 14:59:33,509] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-0 (state.change.logger)
kafka | [2020-11-16 14:59:33,549] TRACE [Controller id=1 epoch=1] Received response {error_code=0,partition_errors=[{topic_name=eventbus,partition_index=0,error_code=0,_tagged_fields={}},{topic_name=eventbus,partition_index=1,error_code=0,_tagged_fields={}},{topic_name=eventbus,partition_index=2,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request LEADER_AND_ISR with correlation id 1 sent to broker kafka:19092 (id: 1 rack: null) (state.change.logger)
kafka | [2020-11-16 14:59:33,564] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,569] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-1 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,570] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-2 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,574] INFO [Broker id=1] Add 3 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,576] TRACE [Controller id=1 epoch=1] Received response {error_code=0,_tagged_fields={}} for request UPDATE_METADATA with correlation id 2 sent to broker kafka:19092 (id: 1 rack: null) (state.change.logger)
kafka | [2020-11-16 15:01:52,665] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka | [2020-11-16 15:01:52,666] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka | [2020-11-16 15:01:52,669] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
kafka | [2020-11-16 15:01:52,675] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:47,246] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka | [2020-11-16 15:06:52,676] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:52,677] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:52,678] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:52,679] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
看完这篇aspnetcoreissue发现问题出在实现上
我的 IHostedService
向 Kafka 发出请求的实现。
StartAsync
方法正在执行任务,运行 直到请求完成。按照设计,此方法意味着即发即弃,即开始任务然后继续。将我的 KafkaAdmin
服务更新为 BackgroundService
,覆盖 ExecuteAsync
方法,如下所列。
随后,测试不再阻塞。
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using KafkaAdmin.Kafka.Config;
namespace KafkaAdmin.Kafka
{
public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
/// <summary>Background Service to make a request from Kafka to create a topic</summary>
public class KafkaAdminService : BackgroundService, IDisposable
{
private KafkaAdminFactory _Factory { get; set; }
private ILogger<KafkaAdminService> _Logger { get; set; }
private KafkaConfig _Config { get; set; }
/// <summary>
/// Retrieve KafkaConfig from appsettings
/// </summary>
/// <param name="config">Config POCO from appsettings file</param>
/// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
/// <param name="logger">Logger instance</param>
public KafkaAdminService(
IOptions<KafkaConfig> config,
KafkaAdminFactory clientFactory,
ILogger<KafkaAdminService> logger)
{
if (clientFactory == null)
throw new ArgumentNullException(nameof(clientFactory));
if (config == null)
throw new ArgumentNullException(nameof(config));
_Config = config.Value ?? throw new ArgumentNullException(nameof(config));
_Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a Kafka topic if it does not already exist
/// </summary>
/// <param name="token">Cancellation token required by IHostedService</param>
/// <exception name="CreateTopicsException">
/// Thrown for exceptions encountered except duplicate topic
/// </exception>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var client = _Factory(_Config))
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
_Logger.LogInformation("Kafka Consumer thread started");
await Task.CompletedTask;
}
/// <summary>
/// Call base class dispose
/// </summary>
public override void Dispose()
{
base.Dispose();
}
}
}
对于live WebApp启动成功的原因,还是很疑惑。为什么这只是 TestServer 的问题?
我创建了一个 repository 来演示问题和修复。 master 分支包含展示问题的源代码。 feat/fix 分支包含包含修复程序的源代码。
希望这对遇到类似问题的其他人有所帮助!
我是新的 Kafka 用户,并且已经设法让 docker-compose 堆栈在本地工作,以便从 ASP.NET Core 3.1 测试服务成功地 运行 进行功能测试。这存在于与 Kafa、Zookeeper 和 Rest-Proxy 服务相同的 docker-compose 堆栈中,位于同一网络上。
SUT 和测试使用 .NET Core Client 在启动时创建主题(如果主题尚不存在)。
只要我尝试 运行 这个 docker-compose 远程 GitLab.com CI 服务器上的堆栈,测试就会在创建主题时挂起。日志(见下文)显示 .NET 客户端正在连接到 docker-compose 堆栈中正确的内部服务 kafka:19092
。 kafka 服务中有一些 activity 开始创建主题,然后它阻塞了。我应该会在日志中看到一条确认主题创建的消息。
.NET 客户端创建 Kafka 主题
/// <summary>Dispatch request to Kafka Broker to create Kafka topic from config</summary>
/// <param name="client">Kafka admin client</param>
/// <exception cref="">Thrown for errors except topic already exists</exception>
private async Task CreateTopicAsync(IAdminClient client)
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
如何配置 kafka 代理和 rest 代理 docker-compose 服务到外部 CI 服务器上的 运行?这可能吗?
下面包含 docker-compose 堆栈和 GitLab CI 环境的详细信息...
创建docker网络
docker network create --gateway 172.19.0.1 --subnet 172.19.0.0/16 broker
docker-与 zookeeper、kafka、rest-proxy 和 ASP.NET Core 3.1 组成堆栈,用于集成测试服务
---
version: "3.8"
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
networks:
camnet:
ipv4_address: 172.19.0.11
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:6.0.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
networks:
camnet:
ipv4_address: 172.19.0.21
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR : 1
KAFKA_NUM_PARTITIONS: 3
rest-proxy:
image: confluentinc/cp-kafka-rest:6.0.0
hostname: rest-proxy
container_name: rest-proxy
depends_on:
- kafka
ports:
- 8082:8082
networks:
camnet:
ipv4_address: 172.19.0.31
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'kafka:19092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
# ASP.NET Core 3 integration tests
# uses kafka dotnet client https://docs.confluent.io/current/clients/dotnet.html
# to create topics from an ASP.NET Core Test Server
webapp:
build:
context: ../
dockerfile: Docker/Test/Dockerfile
target: test
hostname: webapp
image: dcs3spp/webapp
container_name: webapp
depends_on:
- kafka
- rest-proxy
networks:
camnet:
ipv4_address: 172.19.0.61
environment:
- ASPNETCORE_ENVIRONMENT=Docker
networks:
camnet:
external:
name: broker
GitLab.com CI Docker 网络环境
$ docker network ls
NETWORK ID NAME DRIVER SCOPE
2b3286d21fee bridge bridge local
a17bf57d1a86 host host local
0252525b2ca4 none null local
$ docker network inspect bridge
[
{
"Name": "bridge",
"Id": "2b3286d21fee076047e78188b67c2912dfd388a170de3e3cf2ba8d5238e1c6c7",
"Created": "2020-11-16T14:53:35.574299006Z",
"Scope": "local",
"Driver": "bridge",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": [
{
"Subnet": "172.18.0.0/16"
}
]
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {
"com.docker.network.bridge.default_bridge": "true",
"com.docker.network.bridge.enable_icc": "true",
"com.docker.network.bridge.enable_ip_masquerade": "true",
"com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
"com.docker.network.bridge.name": "docker0",
"com.docker.network.driver.mtu": "1500"
},
"Labels": {}
}
]
$ docker network inspect host
[
{
"Name": "host",
"Id": "a17bf57d1a865512bebd3f7f73e0fd761d40b1d4f87765edeac6099e86b94339",
"Created": "2020-11-16T14:53:35.551372286Z",
"Scope": "local",
"Driver": "host",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": []
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {},
"Labels": {}
}
]
$ docker network inspect none
[
{
"Name": "none",
"Id": "0252525b2ca4b28ddc0f950b472485167cfe18e003c62f3d09ce2a856880362a",
"Created": "2020-11-16T14:53:35.536741983Z",
"Scope": "local",
"Driver": "null",
"EnableIPv6": false,
"IPAM": {
"Driver": "default",
"Options": null,
"Config": []
},
"Internal": false,
"Attachable": false,
"Ingress": false,
"ConfigFrom": {
"Network": ""
},
"ConfigOnly": false,
"Containers": {},
"Options": {},
"Labels": {}
}
]
$ docker network create --gateway 172.19.0.1 --subnet 172.19.0.0/16 broker
dbd923b4caacca225f52e8a82dfcad184a1652bde1b5976aa07bbddb2919126c
Gitab.com CI 服务器日志
webapp | A total of 1 test files matched the specified pattern.
webapp | warn: Microsoft.AspNetCore.DataProtection.Repositories.FileSystemXmlRepository[60]
webapp | Storing keys in a directory '/root/.aspnet/DataProtection-Keys' that may not be persisted outside of the container. Protected data will be unavailable when container is destroyed.
webapp | info: WebApp.S3.S3Service[0]
webapp | Minio client created for endpoint minio:9000
webapp | info: WebApp.Kafka.ProducerService[0]
webapp | ProducerService constructor called
webapp | info: WebApp.Kafka.SchemaRegistry.Serdes.JsonDeserializer[0]
webapp | Constructed
webapp | info: WebApp.Kafka.ConsumerService[0]
webapp | Kafka consumer listening to camera topics =>
webapp | info: WebApp.Kafka.ConsumerService[0]
webapp | Camera Topic :: shinobi/RHSsYfiV6Z/xi5cncrNK6/trigger
webapp | info: WebApp.Kafka.ConsumerService[0]
webapp | Camera Topic :: shinobi/group/monitor/trigger
webapp | warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
webapp | No XML encryptor configured. Key {47af6978-c38e-429f-9b34-455ca445c2d8} may be persisted to storage in unencrypted form.
webapp | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp | Admin service trying to create Kafka Topic...
webapp | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp | Topic::eventbus, ReplicationCount::1, PartitionCount::3
webapp | info: WebApp.Kafka.Admin.KafkaAdminService[0]
webapp | Bootstrap Servers::kafka:19092
kafka | [2020-11-16 14:59:32,335] INFO Creating topic eventbus with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1), 1 -> ArrayBuffer(1), 2 -> ArrayBuffer(1)) (kafka.zk.AdminZkClient)
kafka | [2020-11-16 14:59:32,543] INFO [Controller id=1] New topics: [Set(eventbus)], deleted topics: [HashSet()], new partition replica assignment [Map(eventbus-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), eventbus-1 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=), eventbus-2 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=))] (kafka.controller.KafkaController)
kafka | [2020-11-16 14:59:32,546] INFO [Controller id=1] New partition creation callback for eventbus-0,eventbus-1,eventbus-2 (kafka.controller.KafkaController)
kafka | [2020-11-16 14:59:32,557] INFO [Controller id=1 epoch=1] Changed partition eventbus-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,558] INFO [Controller id=1 epoch=1] Changed partition eventbus-1 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,559] INFO [Controller id=1 epoch=1] Changed partition eventbus-2 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,560] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,613] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-0 from NonExistentReplica to NewReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,614] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-1 from NonExistentReplica to NewReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,615] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-2 from NonExistentReplica to NewReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,616] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,770] INFO [Controller id=1 epoch=1] Changed partition eventbus-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka | [2020-11-16 14:59:32,772] INFO [Controller id=1 epoch=1] Changed partition eventbus-1 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka | [2020-11-16 14:59:32,773] INFO [Controller id=1 epoch=1] Changed partition eventbus-2 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
kafka | [2020-11-16 14:59:32,804] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-0 (state.change.logger)
kafka | [2020-11-16 14:59:32,805] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-1 (state.change.logger)
kafka | [2020-11-16 14:59:32,806] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition eventbus-2 (state.change.logger)
kafka | [2020-11-16 14:59:32,808] INFO [Controller id=1 epoch=1] Sending LeaderAndIsr request to broker 1 with 3 become-leader and 0 become-follower partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,818] INFO [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 for 3 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,820] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,821] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,822] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet(1) for 3 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,823] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 1 from controller 1 epoch 1 (state.change.logger)
kafka | [2020-11-16 14:59:32,828] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-0 from NewReplica to OnlineReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,829] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-1 from NewReplica to OnlineReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,830] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition eventbus-2 from NewReplica to OnlineReplica (state.change.logger)
kafka | [2020-11-16 14:59:32,832] INFO [Controller id=1 epoch=1] Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
kafka | [2020-11-16 14:59:32,869] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-2 (state.change.logger)
kafka | [2020-11-16 14:59:32,870] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-1 (state.change.logger)
kafka | [2020-11-16 14:59:32,871] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 1 from controller 1 epoch 1 starting the become-leader transition for partition eventbus-0 (state.change.logger)
kafka | [2020-11-16 14:59:32,873] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(eventbus-2, eventbus-1, eventbus-0) (kafka.server.ReplicaFetcherManager)
kafka | [2020-11-16 14:59:32,874] INFO [Broker id=1] Stopped fetchers as part of LeaderAndIsr request correlationId 1 from controller 1 epoch 1 as part of the become-leader transition for 3 partitions (state.change.logger)
kafka | [2020-11-16 14:59:33,345] INFO [Log partition=eventbus-2, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2020-11-16 14:59:33,421] INFO Created log for partition eventbus-2 in /var/lib/kafka/data/eventbus-2 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka | [2020-11-16 14:59:33,424] INFO [Partition eventbus-2 broker=1] No checkpointed highwatermark is found for partition eventbus-2 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,425] INFO [Partition eventbus-2 broker=1] Log loaded for partition eventbus-2 with initial high watermark 0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,429] INFO [Broker id=1] Leader eventbus-2 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka | [2020-11-16 14:59:33,462] INFO [Log partition=eventbus-1, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2020-11-16 14:59:33,468] INFO Created log for partition eventbus-1 in /var/lib/kafka/data/eventbus-1 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka | [2020-11-16 14:59:33,469] INFO [Partition eventbus-1 broker=1] No checkpointed highwatermark is found for partition eventbus-1 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,470] INFO [Partition eventbus-1 broker=1] Log loaded for partition eventbus-1 with initial high watermark 0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,470] INFO [Broker id=1] Leader eventbus-1 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka | [2020-11-16 14:59:33,484] INFO [Log partition=eventbus-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
kafka | [2020-11-16 14:59:33,488] INFO Created log for partition eventbus-0 in /var/lib/kafka/data/eventbus-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.6-IV0, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
kafka | [2020-11-16 14:59:33,489] INFO [Partition eventbus-0 broker=1] No checkpointed highwatermark is found for partition eventbus-0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,489] INFO [Partition eventbus-0 broker=1] Log loaded for partition eventbus-0 with initial high watermark 0 (kafka.cluster.Partition)
kafka | [2020-11-16 14:59:33,490] INFO [Broker id=1] Leader eventbus-0 starts at leader epoch 0 from offset 0 with high watermark 0 ISR [1] addingReplicas [] removingReplicas []. Previous leader epoch was -1. (state.change.logger)
kafka | [2020-11-16 14:59:33,507] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-2 (state.change.logger)
kafka | [2020-11-16 14:59:33,508] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-1 (state.change.logger)
kafka | [2020-11-16 14:59:33,509] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 1 from controller 1 epoch 1 for the become-leader transition for partition eventbus-0 (state.change.logger)
kafka | [2020-11-16 14:59:33,549] TRACE [Controller id=1 epoch=1] Received response {error_code=0,partition_errors=[{topic_name=eventbus,partition_index=0,error_code=0,_tagged_fields={}},{topic_name=eventbus,partition_index=1,error_code=0,_tagged_fields={}},{topic_name=eventbus,partition_index=2,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request LEADER_AND_ISR with correlation id 1 sent to broker kafka:19092 (id: 1 rack: null) (state.change.logger)
kafka | [2020-11-16 14:59:33,564] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,569] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-1 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,570] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='eventbus', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], offlineReplicas=[]) for partition eventbus-2 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,574] INFO [Broker id=1] Add 3 partitions and deleted 0 partitions from metadata cache in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 2 (state.change.logger)
kafka | [2020-11-16 14:59:33,576] TRACE [Controller id=1 epoch=1] Received response {error_code=0,_tagged_fields={}} for request UPDATE_METADATA with correlation id 2 sent to broker kafka:19092 (id: 1 rack: null) (state.change.logger)
kafka | [2020-11-16 15:01:52,665] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka | [2020-11-16 15:01:52,666] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka | [2020-11-16 15:01:52,669] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
kafka | [2020-11-16 15:01:52,675] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:47,246] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
kafka | [2020-11-16 15:06:52,676] INFO [Controller id=1] Processing automatic preferred replica leader election (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:52,677] TRACE [Controller id=1] Checking need to trigger auto leader balancing (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:52,678] DEBUG [Controller id=1] Topics not in preferred replica for broker 1 Map() (kafka.controller.KafkaController)
kafka | [2020-11-16 15:06:52,679] TRACE [Controller id=1] Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController)
看完这篇aspnetcoreissue发现问题出在实现上
我的 IHostedService
向 Kafka 发出请求的实现。
StartAsync
方法正在执行任务,运行 直到请求完成。按照设计,此方法意味着即发即弃,即开始任务然后继续。将我的 KafkaAdmin
服务更新为 BackgroundService
,覆盖 ExecuteAsync
方法,如下所列。
随后,测试不再阻塞。
using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using KafkaAdmin.Kafka.Config;
namespace KafkaAdmin.Kafka
{
public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);
/// <summary>Background Service to make a request from Kafka to create a topic</summary>
public class KafkaAdminService : BackgroundService, IDisposable
{
private KafkaAdminFactory _Factory { get; set; }
private ILogger<KafkaAdminService> _Logger { get; set; }
private KafkaConfig _Config { get; set; }
/// <summary>
/// Retrieve KafkaConfig from appsettings
/// </summary>
/// <param name="config">Config POCO from appsettings file</param>
/// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
/// <param name="logger">Logger instance</param>
public KafkaAdminService(
IOptions<KafkaConfig> config,
KafkaAdminFactory clientFactory,
ILogger<KafkaAdminService> logger)
{
if (clientFactory == null)
throw new ArgumentNullException(nameof(clientFactory));
if (config == null)
throw new ArgumentNullException(nameof(config));
_Config = config.Value ?? throw new ArgumentNullException(nameof(config));
_Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
_Logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
/// <summary>
/// Create a Kafka topic if it does not already exist
/// </summary>
/// <param name="token">Cancellation token required by IHostedService</param>
/// <exception name="CreateTopicsException">
/// Thrown for exceptions encountered except duplicate topic
/// </exception>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using (var client = _Factory(_Config))
{
try
{
_Logger.LogInformation("Admin service trying to create Kafka Topic...");
_Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
_Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");
await client.CreateTopicsAsync(new TopicSpecification[] {
new TopicSpecification {
Name = _Config.Topic.Name,
NumPartitions = _Config.Topic.PartitionCount,
ReplicationFactor = _Config.Topic.ReplicationCount
}
}, null);
_Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
}
catch (CreateTopicsException e)
{
if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
{
_Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
throw e;
}
else
{
_Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
}
}
}
_Logger.LogInformation("Kafka Consumer thread started");
await Task.CompletedTask;
}
/// <summary>
/// Call base class dispose
/// </summary>
public override void Dispose()
{
base.Dispose();
}
}
}
对于live WebApp启动成功的原因,还是很疑惑。为什么这只是 TestServer 的问题?
我创建了一个 repository 来演示问题和修复。 master 分支包含展示问题的源代码。 feat/fix 分支包含包含修复程序的源代码。
希望这对遇到类似问题的其他人有所帮助!