从 k8s 中的另一个命名空间连接到 kafka
Connecting to kafka from another namespace inside k8s
我的 minikube 中的 kafka 和 zookeeper 有以下配置:
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: kafka
spec:
selector:
app: kafka
ports:
- protocol: TCP
port: 9092
name: kafka-port
- protocol: TCP
port: 9094
name: kafka-port-out
- protocol: TCP
port: 2181
name: kafka-zk
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
k8s-app: kube-dns
template:
metadata:
labels:
app: kafka
k8s-app: kube-dns
spec:
containers:
- name: kafka-container
image: bitnami/kafka:latest
env:
- name: 'ALLOW_PLAINTEXT_LISTENER'
value: 'yes'
- name: 'KAFKA_CFG_ZOOKEEPER_CONNECT'
value: 'zookeeper-service:2181'
- name: 'KAFKA_CFG_LISTENERS'
value: 'PLAINTEXT://:9092'
- name: 'KAFKA_CFG_ADVERTISED_LISTENERS' # if I comment this and the next line it works only locally
value: 'PLAINTEXT://kafka-service.kafka:9092'
ports:
- containerPort: 9092
name: kafka-port
- containerPort: 9094
name: kafka-port-out
- containerPort: 5555
name: kafka-port-jmx
- containerPort: 2181
name: kafka-zk
这是我的 zookeeper 的配置:
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
namespace: kafka
spec:
selector:
app: zookeeper
ports:
- protocol: TCP
port: 2181
name: zookeeper-port
- protocol: TCP
port: 2888
name: zookeeper-peer
- protocol: TCP
port: 3888
name: leader-election
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper-deployment
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
k8s-app: kube-dns
template:
metadata:
labels:
app: zookeeper
k8s-app: kube-dns
spec:
containers:
- name: zookeeper-container
image: bitnami/zookeeper:latest
env:
- name: 'ALLOW_ANONYMOUS_LOGIN'
value: 'yes'
- name: 'ZOOKEEPER_ID'
value: '1'
ports:
- containerPort: 2181
name: zookeeper-port
- containerPort: 2888
name: zookeeper-peer
- containerPort: 3888
name: leader-election
我还有另一个部署:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkacat-deployment
# namespace: debug # I was using it from another namespace, but it was not working so I've tried to use the same
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafkacat
k8s-app: kube-dns
template:
metadata:
labels:
app: kafkacat
k8s-app: kube-dns
spec:
containers:
- name: kafkacat-container
image: edenhill/kafkacat:1.5.0
然后我尝试远程登录它,它成功了。
telnet kafka-service.kafka 9092
Trying 10.101.87.127...
Connected to kafka-service.kafka.svc.cluster.local.
Escape character is '^]'.
这是 nslookup
nslookup kafka-service.kafka
Server: 10.96.0.10
Address: 10.96.0.10#53
Name: kafka-service.kafka.svc.cluster.local
Address: 10.101.87.127
但是当我试图达到它时,我得到的是:
kafkacat -b kafka-service.kafka:9092 -L
% ERROR: Failed to acquire metadata: Local: Timed out
所以我猜问题出在 Kafka 配置中:
如果我评论 env var KAFKA_CFG_ADVERTISED_LISTENERS
它会像这样工作:
# kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker -1: kafka-service.kafka:9092/bootstrap):
1 brokers:
broker 1001 at kafka-deployment-858c5c7f98-tt7sr:9092
2 topics:
topic "my_topic" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "__consumer_offsets" with 50 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
如果我尝试生成消息:
kafkacat -b kafka-service.kafka:9092 -P -t my_topic
oi
% ERROR: Local: Host resolution failure: kafka-deployment-858c5c7f98-w2dm5:9092/1001: Failed to resolve 'kafka-deployment-858c5c7f98-w2dm5:9092': Temporary failure in name resolution (after 15382734ms in state INIT)
那么如果我尝试消费:
kafkacat -b kafka-service.kafka:9092 -C -t my_topic
% ERROR: Local: Host resolution failure: kafka-deployment-858c5c7f98-w2dm5:9092/1001: Failed to resolve 'kafka-deployment-858c5c7f98-w2dm5:9092': Temporary failure in name resolution (after 15406287ms in state INIT)
我已经尝试配置 KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
但是当我尝试使用 kafkacat 获取信息时仍然超时。
这就是为什么通告的侦听器是无法从本地计算机访问的主机名。
如何修复集群中的 Kafka 配置?
我应用了与您提供的完全相同的清单,除了在 kafka-deployment pod 上的 /dev/null 中添加了一个 follow tail 并且能够生成和使用主题。
我是运行k3d中的一个k3s集群:
└[~]> kubectl get nodes
NAME STATUS ROLES AGE VERSION
k3d-dev-server Ready master 31m v1.17.3+k3s1
k3d-dev-worker-1 Ready <none> 31m v1.17.3+k3s1
kafka-部署清单:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkacat-deployment
# namespace: debug # I was using it from another namespace, but it was not working so I've tried to use the same
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafkacat
k8s-app: kube-dns
template:
metadata:
labels:
app: kafkacat
k8s-app: kube-dns
spec:
containers:
- name: kafkacat-container
image: edenhill/kafkacat:1.5.0
resources: {}
command:
- sh
- -c
- "exec tail -f /dev/null"
来自 Kafka 命名空间中的 kafka-deployment pod 的终端日志:
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1001: kafka-service.kafka:9092/1001):
1 brokers:
broker 1001 at kafka-service.kafka:9092 (controller)
0 topics:
/ # kafkacat -b kafka-service.kafka:9092 -P -t my_topic
hi from kafkacat
/ # kafkacat -b kafka-service.kafka:9092 -C -t my_topic
hi from kafkacat
% Reached end of topic my_topic [0] at offset 1
/ # kafkacat -b kafka-service:9092 -L
Metadata for all topics (from broker -1: kafka-service:9092/bootstrap):
1 brokers:
broker 1001 at kafka-service.kafka:9092 (controller)
1 topics:
topic "my_topic" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
/ # kafkacat -b kafka-service:9092 -P -t my_topic
hi from kafka2
/ # kafkacat -b kafka-service:9092 -C -t my_topic
hi from kafkacat
hi from kafka2
% Reached end of topic my_topic [0] at offset 2
来自调试命名空间的 kafka 部署的终端日志:
└[~]> kubectl exec -it kafkacat-deployment-76f9c9db6d-8fth4 -n debug -- ash
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1001: kafka-service.kafka:9092/1001):
1 brokers:
broker 1001 at kafka-service.kafka:9092 (controller)
1 topics:
topic "my_topic" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
/ # kafkacat -b kafka-service.kafka:9092 -P -t my_topic
hi from debug namespace
/ # kafkacat -b kafka-service.kafka:9092 -C -t my_topic
hi from kafkacat
hi from kafka2
hi from debug namespace
% Reached end of topic my_topic [0] at offset 3
在对 minikube 中的错误应用解决方法后,我也让它在 minikube 中工作:
minikube ssh
sudo ip link set docker0 promisc on
删除了 运行 kafka-deployment pod,它在两个命名空间中都有效:
(⎈ |minikube:default)➜ ~ kubectl delete pod -n kafka kafka-deployment-5c4f64599f-kn9xt
pod "kafka-deployment-5c4f64599f-kn9xt" deleted
(⎈ |minikube:default)➜ ~ kubectl exec -n kafka -it kafkacat-deployment-b595d9ccd-4bht7 -- ash
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1003: kafka-service.kafka:9092/1003):
1 brokers:
broker 1003 at kafka-service.kafka:9092 (controller)
0 topics:
/ #
(⎈ |minikube:default)➜ ~ kubectl exec -n debug -it kafkacat-deployment-b595d9ccd-pgzv6 -- ash
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1003: kafka-service.kafka:9092/1003):
1 brokers:
broker 1003 at kafka-service.kafka:9092 (controller)
0 topics:
/ #
免责声明
这是使用 confluent docker 图像测试的,它适用于我本地的 minikube 环境。
要从不同的命名空间访问 kafka,您可以使用类似以下的内容
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
k8s-app: kube-dns
template:
metadata:
labels:
app: kafka
k8s-app: kube-dns
spec:
containers:
- name: kafka-container
image: confluentinc/cp-kafka:latest
env:
- name: KAFKA_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://${KAFKA_POD_IP}:9092
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: PLAINTEXT
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
command: [ "sh","-c" ]
args:
- 'KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_POD_IP}:9092" && /etc/confluent/docker/run'
ports:
- containerPort: 9092
name: kafka-port
- containerPort: 9094
name: kafka-port-out
- containerPort: 5555
name: kafka-port-jmx
- containerPort: 2181
name: kafka-zk
请注意,这是一种解决方法。
首选解决方案是为您的 kafka 服务器创建状态集。您将能够通过 fqdn 引用 pod...
pod-name.stateful-setname.servicename
如果服务器出现故障,这也会为您提供更稳定的环境,有状态集将以相同的名称再次启动。这意味着您的听众不会摔倒。
https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/
我的 minikube 中的 kafka 和 zookeeper 有以下配置:
apiVersion: v1
kind: Service
metadata:
name: kafka-service
namespace: kafka
spec:
selector:
app: kafka
ports:
- protocol: TCP
port: 9092
name: kafka-port
- protocol: TCP
port: 9094
name: kafka-port-out
- protocol: TCP
port: 2181
name: kafka-zk
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
k8s-app: kube-dns
template:
metadata:
labels:
app: kafka
k8s-app: kube-dns
spec:
containers:
- name: kafka-container
image: bitnami/kafka:latest
env:
- name: 'ALLOW_PLAINTEXT_LISTENER'
value: 'yes'
- name: 'KAFKA_CFG_ZOOKEEPER_CONNECT'
value: 'zookeeper-service:2181'
- name: 'KAFKA_CFG_LISTENERS'
value: 'PLAINTEXT://:9092'
- name: 'KAFKA_CFG_ADVERTISED_LISTENERS' # if I comment this and the next line it works only locally
value: 'PLAINTEXT://kafka-service.kafka:9092'
ports:
- containerPort: 9092
name: kafka-port
- containerPort: 9094
name: kafka-port-out
- containerPort: 5555
name: kafka-port-jmx
- containerPort: 2181
name: kafka-zk
这是我的 zookeeper 的配置:
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
namespace: kafka
spec:
selector:
app: zookeeper
ports:
- protocol: TCP
port: 2181
name: zookeeper-port
- protocol: TCP
port: 2888
name: zookeeper-peer
- protocol: TCP
port: 3888
name: leader-election
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: zookeeper-deployment
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper
k8s-app: kube-dns
template:
metadata:
labels:
app: zookeeper
k8s-app: kube-dns
spec:
containers:
- name: zookeeper-container
image: bitnami/zookeeper:latest
env:
- name: 'ALLOW_ANONYMOUS_LOGIN'
value: 'yes'
- name: 'ZOOKEEPER_ID'
value: '1'
ports:
- containerPort: 2181
name: zookeeper-port
- containerPort: 2888
name: zookeeper-peer
- containerPort: 3888
name: leader-election
我还有另一个部署:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkacat-deployment
# namespace: debug # I was using it from another namespace, but it was not working so I've tried to use the same
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafkacat
k8s-app: kube-dns
template:
metadata:
labels:
app: kafkacat
k8s-app: kube-dns
spec:
containers:
- name: kafkacat-container
image: edenhill/kafkacat:1.5.0
然后我尝试远程登录它,它成功了。
telnet kafka-service.kafka 9092
Trying 10.101.87.127...
Connected to kafka-service.kafka.svc.cluster.local.
Escape character is '^]'.
这是 nslookup
nslookup kafka-service.kafka
Server: 10.96.0.10
Address: 10.96.0.10#53
Name: kafka-service.kafka.svc.cluster.local
Address: 10.101.87.127
但是当我试图达到它时,我得到的是:
kafkacat -b kafka-service.kafka:9092 -L
% ERROR: Failed to acquire metadata: Local: Timed out
所以我猜问题出在 Kafka 配置中:
如果我评论 env var KAFKA_CFG_ADVERTISED_LISTENERS
它会像这样工作:
# kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker -1: kafka-service.kafka:9092/bootstrap):
1 brokers:
broker 1001 at kafka-deployment-858c5c7f98-tt7sr:9092
2 topics:
topic "my_topic" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "__consumer_offsets" with 50 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
如果我尝试生成消息:
kafkacat -b kafka-service.kafka:9092 -P -t my_topic
oi
% ERROR: Local: Host resolution failure: kafka-deployment-858c5c7f98-w2dm5:9092/1001: Failed to resolve 'kafka-deployment-858c5c7f98-w2dm5:9092': Temporary failure in name resolution (after 15382734ms in state INIT)
那么如果我尝试消费:
kafkacat -b kafka-service.kafka:9092 -C -t my_topic
% ERROR: Local: Host resolution failure: kafka-deployment-858c5c7f98-w2dm5:9092/1001: Failed to resolve 'kafka-deployment-858c5c7f98-w2dm5:9092': Temporary failure in name resolution (after 15406287ms in state INIT)
我已经尝试配置 KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-service.kafka.svc.cluster.local:9092
但是当我尝试使用 kafkacat 获取信息时仍然超时。
这就是为什么通告的侦听器是无法从本地计算机访问的主机名。 如何修复集群中的 Kafka 配置?
我应用了与您提供的完全相同的清单,除了在 kafka-deployment pod 上的 /dev/null 中添加了一个 follow tail 并且能够生成和使用主题。
我是运行k3d中的一个k3s集群:
└[~]> kubectl get nodes
NAME STATUS ROLES AGE VERSION
k3d-dev-server Ready master 31m v1.17.3+k3s1
k3d-dev-worker-1 Ready <none> 31m v1.17.3+k3s1
kafka-部署清单:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkacat-deployment
# namespace: debug # I was using it from another namespace, but it was not working so I've tried to use the same
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafkacat
k8s-app: kube-dns
template:
metadata:
labels:
app: kafkacat
k8s-app: kube-dns
spec:
containers:
- name: kafkacat-container
image: edenhill/kafkacat:1.5.0
resources: {}
command:
- sh
- -c
- "exec tail -f /dev/null"
来自 Kafka 命名空间中的 kafka-deployment pod 的终端日志:
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1001: kafka-service.kafka:9092/1001):
1 brokers:
broker 1001 at kafka-service.kafka:9092 (controller)
0 topics:
/ # kafkacat -b kafka-service.kafka:9092 -P -t my_topic
hi from kafkacat
/ # kafkacat -b kafka-service.kafka:9092 -C -t my_topic
hi from kafkacat
% Reached end of topic my_topic [0] at offset 1
/ # kafkacat -b kafka-service:9092 -L
Metadata for all topics (from broker -1: kafka-service:9092/bootstrap):
1 brokers:
broker 1001 at kafka-service.kafka:9092 (controller)
1 topics:
topic "my_topic" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
/ # kafkacat -b kafka-service:9092 -P -t my_topic
hi from kafka2
/ # kafkacat -b kafka-service:9092 -C -t my_topic
hi from kafkacat
hi from kafka2
% Reached end of topic my_topic [0] at offset 2
来自调试命名空间的 kafka 部署的终端日志:
└[~]> kubectl exec -it kafkacat-deployment-76f9c9db6d-8fth4 -n debug -- ash
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1001: kafka-service.kafka:9092/1001):
1 brokers:
broker 1001 at kafka-service.kafka:9092 (controller)
1 topics:
topic "my_topic" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
/ # kafkacat -b kafka-service.kafka:9092 -P -t my_topic
hi from debug namespace
/ # kafkacat -b kafka-service.kafka:9092 -C -t my_topic
hi from kafkacat
hi from kafka2
hi from debug namespace
% Reached end of topic my_topic [0] at offset 3
在对 minikube 中的错误应用解决方法后,我也让它在 minikube 中工作:
minikube ssh
sudo ip link set docker0 promisc on
删除了 运行 kafka-deployment pod,它在两个命名空间中都有效:
(⎈ |minikube:default)➜ ~ kubectl delete pod -n kafka kafka-deployment-5c4f64599f-kn9xt
pod "kafka-deployment-5c4f64599f-kn9xt" deleted
(⎈ |minikube:default)➜ ~ kubectl exec -n kafka -it kafkacat-deployment-b595d9ccd-4bht7 -- ash
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1003: kafka-service.kafka:9092/1003):
1 brokers:
broker 1003 at kafka-service.kafka:9092 (controller)
0 topics:
/ #
(⎈ |minikube:default)➜ ~ kubectl exec -n debug -it kafkacat-deployment-b595d9ccd-pgzv6 -- ash
/ # kafkacat -b kafka-service.kafka:9092 -L
Metadata for all topics (from broker 1003: kafka-service.kafka:9092/1003):
1 brokers:
broker 1003 at kafka-service.kafka:9092 (controller)
0 topics:
/ #
免责声明
这是使用 confluent docker 图像测试的,它适用于我本地的 minikube 环境。
要从不同的命名空间访问 kafka,您可以使用类似以下的内容
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-deployment
namespace: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
k8s-app: kube-dns
template:
metadata:
labels:
app: kafka
k8s-app: kube-dns
spec:
containers:
- name: kafka-container
image: confluentinc/cp-kafka:latest
env:
- name: KAFKA_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_ADVERTISED_LISTENERS
value: PLAINTEXT://${KAFKA_POD_IP}:9092
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: PLAINTEXT
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
command: [ "sh","-c" ]
args:
- 'KAFKA_ADVERTISED_LISTENERS="PLAINTEXT://${KAFKA_POD_IP}:9092" && /etc/confluent/docker/run'
ports:
- containerPort: 9092
name: kafka-port
- containerPort: 9094
name: kafka-port-out
- containerPort: 5555
name: kafka-port-jmx
- containerPort: 2181
name: kafka-zk
请注意,这是一种解决方法。
首选解决方案是为您的 kafka 服务器创建状态集。您将能够通过 fqdn 引用 pod...
pod-name.stateful-setname.servicename
如果服务器出现故障,这也会为您提供更稳定的环境,有状态集将以相同的名称再次启动。这意味着您的听众不会摔倒。
https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/