客户端应用程序尝试在 kafka 上生成主题但卡住了,并且 return 也没有任何错误消息 200 OK

Client app tries to producing topic on kafka but stucks and, don't return either any error message neither 200 OK

我们在 kubernetes 集群上部署了 kafka 和 zookeeper pods。这两个正确地相互连接。但是,当我们想通过客户端应用程序生成主题时,PUT 请求卡在待处理状态,并且在很长时间后没有返回任何消息!我该如何调试这种情况? kafka 和 zookeeper 以及客户端应用程序的 .yaml 文件如下所示:

kafka.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null

  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_CREATE_TOPICS
          value: newsrawdata:1:1
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 192.168.88.42:30573
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
          value: "1000"
        image: wurstmeister/kafka
        name: kafka
        ports:
        - containerPort: 9092
        - containerPort: 9094
        resources: {}
        volumeMounts:
        - mountPath: /var/run/docker.sock
          name: kafka-claim0
      hostname: kafka
      restartPolicy: Always
      volumes:
      - name: kafka-claim0
        persistentVolumeClaim:
          claimName: kafka-claim0
status: {}

zookeeper.yaml:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: zookeeper
    spec:
      containers:
      - env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        image: wurstmeister/zookeeper
        name: zookeeper
        ports:
        - containerPort: 2181
        resources: {}
      hostname: zookeeper
      restartPolicy: Always
status: {}

app.yaml:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    service: broker-service
  name: broker-app
spec:
  replicas: 1
  template:
    metadata:
      labels:
        service: broker-service
    spec:
      imagePullSecrets:
      - name: pullsecret
      containers:
      - env:
        - name: OHH_COMMON_REDEPLOY
          value: THIS_WILL_BE_REPLACED
        - name: ASPNETCORE_ENVIRONMENT
          value: docker
        image: localgitlabregistry/broker.app:v0.01
        name: broker-app
        imagePullPolicy: "Always"
        ports:
        - containerPort: 80
        - containerPort: 443
      nodeSelector:
        role: slave1
      restartPolicy: Always

服务如下:

kafka-service.yaml:

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  ports:
  - name: "9092"
    port: 9092
    targetPort: 9092
  - name: "9094"
    port: 9094
    targetPort: 9094
  clusterIP: None
#  type: NodePort
  selector:
    io.kompose.service: kafka
status:
  loadBalancer: {}

动物园管理员-service.yaml

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  ports:
  - name: "2181"
    port: 2181
    targetPort: 2181
  selector:
    io.kompose.service: zookeeper
status:
  loadBalancer: {}

应用-service.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    service: broker-service
  name: broker-service
spec:
  ports:
  - name: "57270"
    port: 80
    targetPort: 80
  - name: "44348"
    port: 443
    targetPort: 443
  selector:
    service: broker-service
  type: 
    NodePort

kafka pod 的日志如下:

waiting for kafka to be ready
[Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect.timeout.ms' in '/opt/kafka/config/server.properties'
[2019-09-29 08:06:56,783] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-29 08:06:57,767] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-29 08:06:57,768] INFO starting (kafka.server.KafkaServer)
[2019-09-29 08:06:57,769] INFO Connecting to zookeeper on 192.168.88.42:30573 (kafka.server.KafkaServer)
[2019-09-29 08:06:57,796] INFO [ZooKeeperClient Kafka server] Initializing a new session to 

.
.
.
[2019-09-29 08:06:57,804] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.version=4.4.0-116-generic (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,806] INFO Initiating client connection, connectString=192.168.88.42:30573 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@2667f029 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,822] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:57,847] INFO Opening socket connection to server 192.168.88.42/192.168.88.42:30573. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,865] INFO Socket connection established to 192.168.88.42/192.168.88.42:30573, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,880] INFO Session establishment complete on server 192.168.88.42/192.168.88.42:30573, sessionid = 0x10005366a620042, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,886] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:58,448] INFO Cluster ID = b8bTvrC2T6iidAcNqD482A (kafka.server.KafkaServer)
[2019-09-29 08:06:58,455] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:58,632] INFO KafkaConfig values: 
    advertised.host.name = kafka
    advertised.listeners = null
    advertised.port = 9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
.
.
.
    zookeeper.connect = 192.168.88.42:30573
    zookeeper.connection.timeout.ms = 6000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-09-29 08:06:58,659] INFO KafkaConfig values: 
    advertised.host.name = kafka
    advertised.listeners = null
    advertised.port = 9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true

    kafka.metrics.reporters = []
    leader.imbalance.check.interval.seconds = 300
    leader.imbalance.per.broker.percentage = 10
    listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    listeners = null
    log.cleaner.backoff.ms = 15000
    log.cleaner.dedupe.buffer.size = 134217728
    log.cleaner.delete.retention.ms = 86400000
    log.cleaner.enable = true
    log.cleaner.io.buffer.load.factor = 0.9
    log.cleaner.io.buffer.size = 524288
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    log.cleaner.max.compaction.lag.ms = 9223372036854775807
    log.cleaner.min.cleanable.ratio = 0.5
    log.cleaner.min.compaction.lag.ms = 0
    log.cleaner.threads = 1
    log.cleanup.policy = [delete]
    log.dir = /tmp/kafka-logs
        .
.
.
    unclean.leader.election.enable = false
    zookeeper.connect = 192.168.88.42:30573
    zookeeper.connection.timeout.ms = 6000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-09-29 08:06:58,721] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,722] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,724] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,797] INFO Log directory /kafka/kafka-logs-kafka not found, creating it. (kafka.log.LogManager)
[2019-09-29 08:06:58,814] INFO Loading logs. (kafka.log.LogManager)
[2019-09-29 08:06:58,834] INFO Logs loading complete in 20 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,869] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,877] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-09-29 08:06:59,505] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-09-29 08:06:59,549] INFO [SocketServer brokerId=1033] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-09-29 08:06:59,550] INFO [SocketServer brokerId=1033] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-09-29 08:06:59,587] INFO [ExpirationReaper-1033-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,600] INFO [ExpirationReaper-1033-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,614] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-09-29 08:06:59,716] INFO Creating /brokers/ids/1033 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,743] INFO Stat of the created znode at /brokers/ids/1033 is: 776,776,1569744419734,1569744419734,1,0,0,72063325309108290,180,0,776
 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,745] INFO Registered broker 1033 at path /brokers/ids/1033 with addresses: ArrayBuffer(EndPoint(kafka,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 776 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,748] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:59,882] INFO [ExpirationReaper-1033-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,888] INFO [ExpirationReaper-1033-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,895] INFO [ExpirationReaper-1033-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,940] INFO [GroupCoordinator 1033]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,949] INFO [GroupCoordinator 1033]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,961] INFO [GroupMetadataManager brokerId=1033] Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-09-29 08:06:59,990] INFO [ProducerId Manager 1033]: Acquired new producerId block (brokerId:1033,blockStartProducerId:21000,blockEndProducerId:21999) by writing to Zk with path version 22 (kafka.coordinator.transaction.ProducerIdManager)
[2019-09-29 08:07:00,044] INFO [TransactionCoordinator id=1033] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,056] INFO [Transaction Marker Channel Manager 1033]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-09-29 08:07:00,061] INFO [TransactionCoordinator id=1033] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,207] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-09-29 08:07:00,289] INFO [SocketServer brokerId=1033] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-09-29 08:07:00,326] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka startTimeMs: 1569744420299 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,341] INFO [KafkaServer id=1033] started (kafka.server.KafkaServer)
creating topics: newsrawdata:1:1

来自 zookeeper pod 的日志:

2019-09-29 08:06:58,003 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:create cxid:0xd zxid:0x306 txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode = NodeExists for /config/brokers
2019-09-29 08:07:00,421 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:multi cxid:0x3f zxid:0x30d txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2019-09-29 08:07:07,512 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.44.0.0:39244
2019-09-29 08:07:07,519 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /10.44.0.0:39244
2019-09-29 08:07:07,521 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@694] - Established session 0x10005366a620043 with negotiated timeout 30000 for client /10.44.0.0:39244
2019-09-29 08:07:08,034 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10005366a620043
2019-09-29 08:07:08,045 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /10.44.0.0:39244 which had sessionid 0x10005366a620043
2019-09-29 08:07:13,180 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 08:07:13,181 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2019-09-29 09:07:13,180 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 09:07:13,182 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.

来自客户端应用程序的日志:

Kafka Ip Server:kafka:9092
warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
      No XML encryptor configured. Key {16b9a9aa-732a-47ab-bd31-ce341be7f812} may be persisted to storage in unencrypted form.
Hosting environment: docker
Content root path: /app
Now listening on: http://[::]:80
Application started. Press Ctrl+C to shut down.

我们在客户端应用程序中将 "BootstrapServers" 设置为 "kafka:9092"。似乎客户端 cat 解析了集群中的 kafka 并看到了 kafka pod 的 IP,但是当我们发送 PUT 请求时没有发生任何事件。值得注意的是,通过使用 docker-compose 从 kubernetes 集群中使用此配置,它可以按预期工作!这个配置有什么问题?

然后确保您的节点具有正确的选择器:部署代理的节点必须具有 role:slave1 选择器。否则只需从代理部署文件中删除带有 nodeSelector 的行。

然后向部署配置文件的规范添加行:

  selector:
    matchLabels:
      io.kompose.service: kafka

这个是给 kafka.yaml

您不必为服务添加标签,指定选择器就足够了,因此从服务配置文件中删除标签字段。

然后在kafka部署配置文件chane行:

 name: KAFKA_ZOOKEEPER_CONNECT
          #value: 192.168.88.42:30573
           value: your_zookeeper_service_ip:2181

行值应该包括你的zookeeper服务的ip和端口2181,如果你的zookeeper服务有ip 192.168.88.42值是合适的。

实际上,问题与kafka配置有关(KAFKA_ADVERTISE_LISTENER、KAFKA_INTER_BROKER_LISTENER_NAME、KAFKA_LISTENERS)。无论如何,下面的 deployemnt.yaml 配置现在对我来说工作正常,没有对服务文件进行任何更改。

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        io.kompose.service: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: INSIDE://:9092
        - name: KAFKA_CREATE_TOPICS
          value: newsrawdata:1:1
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INSIDE
        - name: KAFKA_LISTENERS
          value: INSIDE://:9092
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: INSIDE:PLAINTEXT
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 192.168.88.207:30573
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
          value: "1000"
        image: wurstmeister/kafka
        name: kafka
        ports:
        - containerPort: 9092
        - containerPort: 9094
      hostname: kafka
      restartPolicy: Always

当前连接到 zookeeper 的 kafka 实例和客户端应用程序可以正确地生产和使用 kafka。