客户端应用程序尝试在 kafka 上生成主题但卡住了,并且 return 也没有任何错误消息 200 OK
Client app tries to producing topic on kafka but stucks and, don't return either any error message neither 200 OK
我们在 kubernetes 集群上部署了 kafka 和 zookeeper pods。这两个正确地相互连接。但是,当我们想通过客户端应用程序生成主题时,PUT 请求卡在待处理状态,并且在很长时间后没有返回任何消息!我该如何调试这种情况?
kafka 和 zookeeper 以及客户端应用程序的 .yaml
文件如下所示:
kafka.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: kafka
name: kafka
spec:
replicas: 1
strategy:
type: Recreate
template:
metadata:
creationTimestamp: null
labels:
io.kompose.service: kafka
spec:
containers:
- env:
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_CREATE_TOPICS
value: newsrawdata:1:1
- name: KAFKA_ZOOKEEPER_CONNECT
value: 192.168.88.42:30573
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
value: "1000"
image: wurstmeister/kafka
name: kafka
ports:
- containerPort: 9092
- containerPort: 9094
resources: {}
volumeMounts:
- mountPath: /var/run/docker.sock
name: kafka-claim0
hostname: kafka
restartPolicy: Always
volumes:
- name: kafka-claim0
persistentVolumeClaim:
claimName: kafka-claim0
status: {}
zookeeper.yaml:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: zookeeper
name: zookeeper
spec:
replicas: 1
strategy: {}
template:
metadata:
creationTimestamp: null
labels:
io.kompose.service: zookeeper
spec:
containers:
- env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
image: wurstmeister/zookeeper
name: zookeeper
ports:
- containerPort: 2181
resources: {}
hostname: zookeeper
restartPolicy: Always
status: {}
app.yaml:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
service: broker-service
name: broker-app
spec:
replicas: 1
template:
metadata:
labels:
service: broker-service
spec:
imagePullSecrets:
- name: pullsecret
containers:
- env:
- name: OHH_COMMON_REDEPLOY
value: THIS_WILL_BE_REPLACED
- name: ASPNETCORE_ENVIRONMENT
value: docker
image: localgitlabregistry/broker.app:v0.01
name: broker-app
imagePullPolicy: "Always"
ports:
- containerPort: 80
- containerPort: 443
nodeSelector:
role: slave1
restartPolicy: Always
服务如下:
kafka-service.yaml:
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: kafka
name: kafka
spec:
ports:
- name: "9092"
port: 9092
targetPort: 9092
- name: "9094"
port: 9094
targetPort: 9094
clusterIP: None
# type: NodePort
selector:
io.kompose.service: kafka
status:
loadBalancer: {}
动物园管理员-service.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: zookeeper
name: zookeeper
spec:
ports:
- name: "2181"
port: 2181
targetPort: 2181
selector:
io.kompose.service: zookeeper
status:
loadBalancer: {}
应用-service.yaml
apiVersion: v1
kind: Service
metadata:
labels:
service: broker-service
name: broker-service
spec:
ports:
- name: "57270"
port: 80
targetPort: 80
- name: "44348"
port: 443
targetPort: 443
selector:
service: broker-service
type:
NodePort
kafka pod 的日志如下:
waiting for kafka to be ready
[Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect.timeout.ms' in '/opt/kafka/config/server.properties'
[2019-09-29 08:06:56,783] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-29 08:06:57,767] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-29 08:06:57,768] INFO starting (kafka.server.KafkaServer)
[2019-09-29 08:06:57,769] INFO Connecting to zookeeper on 192.168.88.42:30573 (kafka.server.KafkaServer)
[2019-09-29 08:06:57,796] INFO [ZooKeeperClient Kafka server] Initializing a new session to
.
.
.
[2019-09-29 08:06:57,804] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.version=4.4.0-116-generic (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,806] INFO Initiating client connection, connectString=192.168.88.42:30573 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@2667f029 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,822] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:57,847] INFO Opening socket connection to server 192.168.88.42/192.168.88.42:30573. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,865] INFO Socket connection established to 192.168.88.42/192.168.88.42:30573, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,880] INFO Session establishment complete on server 192.168.88.42/192.168.88.42:30573, sessionid = 0x10005366a620042, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,886] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:58,448] INFO Cluster ID = b8bTvrC2T6iidAcNqD482A (kafka.server.KafkaServer)
[2019-09-29 08:06:58,455] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:58,632] INFO KafkaConfig values:
advertised.host.name = kafka
advertised.listeners = null
advertised.port = 9092
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
.
.
.
zookeeper.connect = 192.168.88.42:30573
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2019-09-29 08:06:58,659] INFO KafkaConfig values:
advertised.host.name = kafka
advertised.listeners = null
advertised.port = 9092
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
.
.
.
unclean.leader.election.enable = false
zookeeper.connect = 192.168.88.42:30573
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2019-09-29 08:06:58,721] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,722] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,724] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,797] INFO Log directory /kafka/kafka-logs-kafka not found, creating it. (kafka.log.LogManager)
[2019-09-29 08:06:58,814] INFO Loading logs. (kafka.log.LogManager)
[2019-09-29 08:06:58,834] INFO Logs loading complete in 20 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,869] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,877] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-09-29 08:06:59,505] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-09-29 08:06:59,549] INFO [SocketServer brokerId=1033] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-09-29 08:06:59,550] INFO [SocketServer brokerId=1033] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-09-29 08:06:59,587] INFO [ExpirationReaper-1033-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,600] INFO [ExpirationReaper-1033-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,614] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-09-29 08:06:59,716] INFO Creating /brokers/ids/1033 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,743] INFO Stat of the created znode at /brokers/ids/1033 is: 776,776,1569744419734,1569744419734,1,0,0,72063325309108290,180,0,776
(kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,745] INFO Registered broker 1033 at path /brokers/ids/1033 with addresses: ArrayBuffer(EndPoint(kafka,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 776 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,748] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:59,882] INFO [ExpirationReaper-1033-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,888] INFO [ExpirationReaper-1033-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,895] INFO [ExpirationReaper-1033-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,940] INFO [GroupCoordinator 1033]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,949] INFO [GroupCoordinator 1033]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,961] INFO [GroupMetadataManager brokerId=1033] Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-09-29 08:06:59,990] INFO [ProducerId Manager 1033]: Acquired new producerId block (brokerId:1033,blockStartProducerId:21000,blockEndProducerId:21999) by writing to Zk with path version 22 (kafka.coordinator.transaction.ProducerIdManager)
[2019-09-29 08:07:00,044] INFO [TransactionCoordinator id=1033] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,056] INFO [Transaction Marker Channel Manager 1033]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-09-29 08:07:00,061] INFO [TransactionCoordinator id=1033] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,207] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-09-29 08:07:00,289] INFO [SocketServer brokerId=1033] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-09-29 08:07:00,326] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka startTimeMs: 1569744420299 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,341] INFO [KafkaServer id=1033] started (kafka.server.KafkaServer)
creating topics: newsrawdata:1:1
来自 zookeeper pod 的日志:
2019-09-29 08:06:58,003 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:create cxid:0xd zxid:0x306 txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode = NodeExists for /config/brokers
2019-09-29 08:07:00,421 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:multi cxid:0x3f zxid:0x30d txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2019-09-29 08:07:07,512 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.44.0.0:39244
2019-09-29 08:07:07,519 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /10.44.0.0:39244
2019-09-29 08:07:07,521 [myid:] - INFO [SyncThread:0:ZooKeeperServer@694] - Established session 0x10005366a620043 with negotiated timeout 30000 for client /10.44.0.0:39244
2019-09-29 08:07:08,034 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10005366a620043
2019-09-29 08:07:08,045 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /10.44.0.0:39244 which had sessionid 0x10005366a620043
2019-09-29 08:07:13,180 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 08:07:13,181 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2019-09-29 09:07:13,180 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 09:07:13,182 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
来自客户端应用程序的日志:
Kafka Ip Server:kafka:9092
warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
No XML encryptor configured. Key {16b9a9aa-732a-47ab-bd31-ce341be7f812} may be persisted to storage in unencrypted form.
Hosting environment: docker
Content root path: /app
Now listening on: http://[::]:80
Application started. Press Ctrl+C to shut down.
我们在客户端应用程序中将 "BootstrapServers" 设置为 "kafka:9092"。似乎客户端 cat 解析了集群中的 kafka 并看到了 kafka pod 的 IP,但是当我们发送 PUT 请求时没有发生任何事件。值得注意的是,通过使用 docker-compose 从 kubernetes 集群中使用此配置,它可以按预期工作!这个配置有什么问题?
然后确保您的节点具有正确的选择器:部署代理的节点必须具有 role:slave1 选择器。否则只需从代理部署文件中删除带有 nodeSelector 的行。
然后向部署配置文件的规范添加行:
selector:
matchLabels:
io.kompose.service: kafka
这个是给 kafka.yaml
您不必为服务添加标签,指定选择器就足够了,因此从服务配置文件中删除标签字段。
然后在kafka部署配置文件chane行:
name: KAFKA_ZOOKEEPER_CONNECT
#value: 192.168.88.42:30573
value: your_zookeeper_service_ip:2181
行值应该包括你的zookeeper服务的ip和端口2181,如果你的zookeeper服务有ip 192.168.88.42值是合适的。
实际上,问题与kafka配置有关(KAFKA_ADVERTISE_LISTENER、KAFKA_INTER_BROKER_LISTENER_NAME、KAFKA_LISTENERS)。无论如何,下面的 deployemnt.yaml 配置现在对我来说工作正常,没有对服务文件进行任何更改。
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
labels:
io.kompose.service: kafka
name: kafka
spec:
replicas: 1
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.service: kafka
spec:
containers:
- env:
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: INSIDE://:9092
- name: KAFKA_CREATE_TOPICS
value: newsrawdata:1:1
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INSIDE
- name: KAFKA_LISTENERS
value: INSIDE://:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INSIDE:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: 192.168.88.207:30573
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
value: "1000"
image: wurstmeister/kafka
name: kafka
ports:
- containerPort: 9092
- containerPort: 9094
hostname: kafka
restartPolicy: Always
当前连接到 zookeeper 的 kafka 实例和客户端应用程序可以正确地生产和使用 kafka。
我们在 kubernetes 集群上部署了 kafka 和 zookeeper pods。这两个正确地相互连接。但是,当我们想通过客户端应用程序生成主题时,PUT 请求卡在待处理状态,并且在很长时间后没有返回任何消息!我该如何调试这种情况?
kafka 和 zookeeper 以及客户端应用程序的 .yaml
文件如下所示:
kafka.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: kafka
name: kafka
spec:
replicas: 1
strategy:
type: Recreate
template:
metadata:
creationTimestamp: null
labels:
io.kompose.service: kafka
spec:
containers:
- env:
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_CREATE_TOPICS
value: newsrawdata:1:1
- name: KAFKA_ZOOKEEPER_CONNECT
value: 192.168.88.42:30573
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
value: "1000"
image: wurstmeister/kafka
name: kafka
ports:
- containerPort: 9092
- containerPort: 9094
resources: {}
volumeMounts:
- mountPath: /var/run/docker.sock
name: kafka-claim0
hostname: kafka
restartPolicy: Always
volumes:
- name: kafka-claim0
persistentVolumeClaim:
claimName: kafka-claim0
status: {}
zookeeper.yaml:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: zookeeper
name: zookeeper
spec:
replicas: 1
strategy: {}
template:
metadata:
creationTimestamp: null
labels:
io.kompose.service: zookeeper
spec:
containers:
- env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
image: wurstmeister/zookeeper
name: zookeeper
ports:
- containerPort: 2181
resources: {}
hostname: zookeeper
restartPolicy: Always
status: {}
app.yaml:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
labels:
service: broker-service
name: broker-app
spec:
replicas: 1
template:
metadata:
labels:
service: broker-service
spec:
imagePullSecrets:
- name: pullsecret
containers:
- env:
- name: OHH_COMMON_REDEPLOY
value: THIS_WILL_BE_REPLACED
- name: ASPNETCORE_ENVIRONMENT
value: docker
image: localgitlabregistry/broker.app:v0.01
name: broker-app
imagePullPolicy: "Always"
ports:
- containerPort: 80
- containerPort: 443
nodeSelector:
role: slave1
restartPolicy: Always
服务如下:
kafka-service.yaml:
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: kafka
name: kafka
spec:
ports:
- name: "9092"
port: 9092
targetPort: 9092
- name: "9094"
port: 9094
targetPort: 9094
clusterIP: None
# type: NodePort
selector:
io.kompose.service: kafka
status:
loadBalancer: {}
动物园管理员-service.yaml
apiVersion: v1
kind: Service
metadata:
annotations:
kompose.cmd: kompose convert
kompose.version: 1.18.0 (06a2e56)
creationTimestamp: null
labels:
io.kompose.service: zookeeper
name: zookeeper
spec:
ports:
- name: "2181"
port: 2181
targetPort: 2181
selector:
io.kompose.service: zookeeper
status:
loadBalancer: {}
应用-service.yaml
apiVersion: v1
kind: Service
metadata:
labels:
service: broker-service
name: broker-service
spec:
ports:
- name: "57270"
port: 80
targetPort: 80
- name: "44348"
port: 443
targetPort: 443
selector:
service: broker-service
type:
NodePort
kafka pod 的日志如下:
waiting for kafka to be ready
[Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect.timeout.ms' in '/opt/kafka/config/server.properties'
[2019-09-29 08:06:56,783] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-29 08:06:57,767] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-29 08:06:57,768] INFO starting (kafka.server.KafkaServer)
[2019-09-29 08:06:57,769] INFO Connecting to zookeeper on 192.168.88.42:30573 (kafka.server.KafkaServer)
[2019-09-29 08:06:57,796] INFO [ZooKeeperClient Kafka server] Initializing a new session to
.
.
.
[2019-09-29 08:06:57,804] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.version=4.4.0-116-generic (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,806] INFO Initiating client connection, connectString=192.168.88.42:30573 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@2667f029 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,822] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:57,847] INFO Opening socket connection to server 192.168.88.42/192.168.88.42:30573. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,865] INFO Socket connection established to 192.168.88.42/192.168.88.42:30573, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,880] INFO Session establishment complete on server 192.168.88.42/192.168.88.42:30573, sessionid = 0x10005366a620042, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,886] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:58,448] INFO Cluster ID = b8bTvrC2T6iidAcNqD482A (kafka.server.KafkaServer)
[2019-09-29 08:06:58,455] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:58,632] INFO KafkaConfig values:
advertised.host.name = kafka
advertised.listeners = null
advertised.port = 9092
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
.
.
.
zookeeper.connect = 192.168.88.42:30573
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2019-09-29 08:06:58,659] INFO KafkaConfig values:
advertised.host.name = kafka
advertised.listeners = null
advertised.port = 9092
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
.
.
.
unclean.leader.election.enable = false
zookeeper.connect = 192.168.88.42:30573
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
(kafka.server.KafkaConfig)
[2019-09-29 08:06:58,721] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,722] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,724] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,797] INFO Log directory /kafka/kafka-logs-kafka not found, creating it. (kafka.log.LogManager)
[2019-09-29 08:06:58,814] INFO Loading logs. (kafka.log.LogManager)
[2019-09-29 08:06:58,834] INFO Logs loading complete in 20 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,869] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,877] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-09-29 08:06:59,505] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-09-29 08:06:59,549] INFO [SocketServer brokerId=1033] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-09-29 08:06:59,550] INFO [SocketServer brokerId=1033] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-09-29 08:06:59,587] INFO [ExpirationReaper-1033-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,600] INFO [ExpirationReaper-1033-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,614] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-09-29 08:06:59,716] INFO Creating /brokers/ids/1033 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,743] INFO Stat of the created znode at /brokers/ids/1033 is: 776,776,1569744419734,1569744419734,1,0,0,72063325309108290,180,0,776
(kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,745] INFO Registered broker 1033 at path /brokers/ids/1033 with addresses: ArrayBuffer(EndPoint(kafka,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 776 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,748] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:59,882] INFO [ExpirationReaper-1033-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,888] INFO [ExpirationReaper-1033-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,895] INFO [ExpirationReaper-1033-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,940] INFO [GroupCoordinator 1033]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,949] INFO [GroupCoordinator 1033]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,961] INFO [GroupMetadataManager brokerId=1033] Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-09-29 08:06:59,990] INFO [ProducerId Manager 1033]: Acquired new producerId block (brokerId:1033,blockStartProducerId:21000,blockEndProducerId:21999) by writing to Zk with path version 22 (kafka.coordinator.transaction.ProducerIdManager)
[2019-09-29 08:07:00,044] INFO [TransactionCoordinator id=1033] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,056] INFO [Transaction Marker Channel Manager 1033]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-09-29 08:07:00,061] INFO [TransactionCoordinator id=1033] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,207] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-09-29 08:07:00,289] INFO [SocketServer brokerId=1033] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-09-29 08:07:00,326] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka startTimeMs: 1569744420299 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,341] INFO [KafkaServer id=1033] started (kafka.server.KafkaServer)
creating topics: newsrawdata:1:1
来自 zookeeper pod 的日志:
2019-09-29 08:06:58,003 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:create cxid:0xd zxid:0x306 txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode = NodeExists for /config/brokers
2019-09-29 08:07:00,421 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:multi cxid:0x3f zxid:0x30d txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2019-09-29 08:07:07,512 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.44.0.0:39244
2019-09-29 08:07:07,519 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /10.44.0.0:39244
2019-09-29 08:07:07,521 [myid:] - INFO [SyncThread:0:ZooKeeperServer@694] - Established session 0x10005366a620043 with negotiated timeout 30000 for client /10.44.0.0:39244
2019-09-29 08:07:08,034 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10005366a620043
2019-09-29 08:07:08,045 [myid:] - INFO [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /10.44.0.0:39244 which had sessionid 0x10005366a620043
2019-09-29 08:07:13,180 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 08:07:13,181 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2019-09-29 09:07:13,180 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 09:07:13,182 [myid:] - INFO [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
来自客户端应用程序的日志:
Kafka Ip Server:kafka:9092
warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
No XML encryptor configured. Key {16b9a9aa-732a-47ab-bd31-ce341be7f812} may be persisted to storage in unencrypted form.
Hosting environment: docker
Content root path: /app
Now listening on: http://[::]:80
Application started. Press Ctrl+C to shut down.
我们在客户端应用程序中将 "BootstrapServers" 设置为 "kafka:9092"。似乎客户端 cat 解析了集群中的 kafka 并看到了 kafka pod 的 IP,但是当我们发送 PUT 请求时没有发生任何事件。值得注意的是,通过使用 docker-compose 从 kubernetes 集群中使用此配置,它可以按预期工作!这个配置有什么问题?
然后确保您的节点具有正确的选择器:部署代理的节点必须具有 role:slave1 选择器。否则只需从代理部署文件中删除带有 nodeSelector 的行。
然后向部署配置文件的规范添加行:
selector:
matchLabels:
io.kompose.service: kafka
这个是给 kafka.yaml
您不必为服务添加标签,指定选择器就足够了,因此从服务配置文件中删除标签字段。
然后在kafka部署配置文件chane行:
name: KAFKA_ZOOKEEPER_CONNECT
#value: 192.168.88.42:30573
value: your_zookeeper_service_ip:2181
行值应该包括你的zookeeper服务的ip和端口2181,如果你的zookeeper服务有ip 192.168.88.42值是合适的。
实际上,问题与kafka配置有关(KAFKA_ADVERTISE_LISTENER、KAFKA_INTER_BROKER_LISTENER_NAME、KAFKA_LISTENERS)。无论如何,下面的 deployemnt.yaml 配置现在对我来说工作正常,没有对服务文件进行任何更改。
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
annotations:
labels:
io.kompose.service: kafka
name: kafka
spec:
replicas: 1
strategy:
type: Recreate
template:
metadata:
labels:
io.kompose.service: kafka
spec:
containers:
- env:
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: INSIDE://:9092
- name: KAFKA_CREATE_TOPICS
value: newsrawdata:1:1
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INSIDE
- name: KAFKA_LISTENERS
value: INSIDE://:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INSIDE:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: 192.168.88.207:30573
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
value: "1000"
image: wurstmeister/kafka
name: kafka
ports:
- containerPort: 9092
- containerPort: 9094
hostname: kafka
restartPolicy: Always
当前连接到 zookeeper 的 kafka 实例和客户端应用程序可以正确地生产和使用 kafka。