如何将 OpenShift 上的 KSQLDB 集群连接到本地 Kerberized Kafka 集群
How to connect your KSQLDB-Cluster on OpenShift to an on-premise kerberized Kafka-cluster
我想达到的效果:
我们有一个本地 Kafka 集群。我想在 OpenShift 中设置 KSQLDB 并将其连接到本地 Kafka 集群的代理。
问题:
当我尝试使用命令“/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties”启动 KSQLDB 服务器时,我收到错误消息:
[2020-05-14 15:47:48,519] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
我的配置:
我在这个镜像的基础上设置了我的 Dockerfile:https://hub.docker.com/r/confluentinc/ksqldb-server,端口 9092、9093、8080、8082 和 443 是开放的。
我的服务 yaml 看起来像这样:
kind: Service
apiVersion: v1
metadata:
name: social-media-dev
namespace: abc
selfLink: xyz
uid: xyz
resourceVersion: '1'
creationTimestamp: '2020-05-14T09:47:15Z'
labels:
app: social-media-dev
annotations:
openshift.io/generated-by: OpenShiftNewApp
spec:
ports:
- name: social-media-dev
protocol: TCP
port: 9092
targetPort: 9092
nodePort: 31364
selector:
app: social-media-dev
deploymentconfig: social-media-dev
clusterIP: XX.XX.XXX.XXX
type: LoadBalancer
externalIPs:
- XXX.XX.XXX.XXX
sessionAffinity: None
externalTrafficPolicy: Cluster
status:
loadBalancer:
ingress:
- ip: XX.XX.XXX.XXX
我的 ksql-server.properties 文件包含以下信息:
听众:http://0.0.0.0:8082
bootstrap.servers: X.X.X.X:9092, X.X.X.Y:9092, X.X.X.Z:9092
到目前为止我已经尝试过:
我尝试从我的 pod 连接到一个代理并且成功了:
(timeout 1 bash -c '</dev/tcp/X.X.X.X/9092 && echo PORT OPEN || echo PORT CLOSED') 2>/dev/null
结果:端口打开
我也试过监听器,但是错误消息变得更短了,只是信息“无法获取 Kafka 集群配置!”并且没有超时错误。
我试过将LoadBalancer换成Nodeport,也没有成功。
你知道我接下来可以尝试什么吗?
更新:通过升级到 Cloudera CDH6,Cloudera Kafka 集群现在也可以与 Kafka Streams 一起使用。因此,我现在能够从 Openshift 中的 KSQLDB 集群连接到本地 Kafka 集群。
更新:
通过升级到 Cloudera CDH6,Cloudera Kafka 集群现在也可以与 Kafka Streams 一起使用。因此,我现在能够从 Openshift 中的 KSQLDB 集群连接到本地 Kafka 集群。
我还将在这里描述我连接到 kerberized Kafka 集群的最终方法,因为我一直在努力获得它 运行:
- 获取 Kerberos 票证并通过 SSL 建立连接
ksql-server.properties(其中的 sasl_ssl 部分):
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
ssl.truststore.location=truststore.jks
ssl.truststore.password=password
ssl.truststore.type=JKS
ssl.ca.location=cert
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka" principal="myprincipal";
serviceName="kafka"
producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=truststore.jks
producer.ssl.truststore.password=password
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka" principal="myprincipal";
consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=password
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka" principal="myprincipal";`
- 因此设置哨兵规则
HOST=[HOST]->CLUSTER=kafka-cluster->action=idempotentwrite
HOST=[HOST]->TRANSACTIONALID=[ID]->action=describe
HOST=[HOST]->TRANSACTIONALID=[ID]->action=write
我想达到的效果:
我们有一个本地 Kafka 集群。我想在 OpenShift 中设置 KSQLDB 并将其连接到本地 Kafka 集群的代理。
问题:
当我尝试使用命令“/usr/bin/ksql-server-start /etc/ksqldb/ksql-server.properties”启动 KSQLDB 服务器时,我收到错误消息:
[2020-05-14 15:47:48,519] ERROR Failed to start KSQL (io.confluent.ksql.rest.server.KsqlServerMain:60)
io.confluent.ksql.util.KsqlServerException: Could not get Kafka cluster configuration!
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:90)
at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.isKafkaAuthorizerEnabled(KsqlAuthorizationValidatorFactory.java:81)
at io.confluent.ksql.security.KsqlAuthorizationValidatorFactory.create(KsqlAuthorizationValidatorFactory.java:51)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:624)
at io.confluent.ksql.rest.server.KsqlRestApplication.buildApplication(KsqlRestApplication.java:544)
at io.confluent.ksql.rest.server.KsqlServerMain.createExecutable(KsqlServerMain.java:98)
at io.confluent.ksql.rest.server.KsqlServerMain.main(KsqlServerMain.java:56)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at io.confluent.ksql.services.KafkaClusterUtil.getConfig(KafkaClusterUtil.java:60)
... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1589471268517) timed out at 1589471268518 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
我的配置:
我在这个镜像的基础上设置了我的 Dockerfile:https://hub.docker.com/r/confluentinc/ksqldb-server,端口 9092、9093、8080、8082 和 443 是开放的。
我的服务 yaml 看起来像这样:
kind: Service
apiVersion: v1
metadata:
name: social-media-dev
namespace: abc
selfLink: xyz
uid: xyz
resourceVersion: '1'
creationTimestamp: '2020-05-14T09:47:15Z'
labels:
app: social-media-dev
annotations:
openshift.io/generated-by: OpenShiftNewApp
spec:
ports:
- name: social-media-dev
protocol: TCP
port: 9092
targetPort: 9092
nodePort: 31364
selector:
app: social-media-dev
deploymentconfig: social-media-dev
clusterIP: XX.XX.XXX.XXX
type: LoadBalancer
externalIPs:
- XXX.XX.XXX.XXX
sessionAffinity: None
externalTrafficPolicy: Cluster
status:
loadBalancer:
ingress:
- ip: XX.XX.XXX.XXX
我的 ksql-server.properties 文件包含以下信息:
听众:http://0.0.0.0:8082
bootstrap.servers: X.X.X.X:9092, X.X.X.Y:9092, X.X.X.Z:9092
到目前为止我已经尝试过:
我尝试从我的 pod 连接到一个代理并且成功了:(timeout 1 bash -c '</dev/tcp/X.X.X.X/9092 && echo PORT OPEN || echo PORT CLOSED') 2>/dev/null
结果:端口打开
我也试过监听器,但是错误消息变得更短了,只是信息“无法获取 Kafka 集群配置!”并且没有超时错误。
我试过将LoadBalancer换成Nodeport,也没有成功。
你知道我接下来可以尝试什么吗?
更新:通过升级到 Cloudera CDH6,Cloudera Kafka 集群现在也可以与 Kafka Streams 一起使用。因此,我现在能够从 Openshift 中的 KSQLDB 集群连接到本地 Kafka 集群。
更新: 通过升级到 Cloudera CDH6,Cloudera Kafka 集群现在也可以与 Kafka Streams 一起使用。因此,我现在能够从 Openshift 中的 KSQLDB 集群连接到本地 Kafka 集群。
我还将在这里描述我连接到 kerberized Kafka 集群的最终方法,因为我一直在努力获得它 运行:
- 获取 Kerberos 票证并通过 SSL 建立连接
ksql-server.properties(其中的 sasl_ssl 部分):
security.protocol=SASL_SSL
sasl.mechanism=GSSAPI
ssl.truststore.location=truststore.jks
ssl.truststore.password=password
ssl.truststore.type=JKS
ssl.ca.location=cert
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka" principal="myprincipal";
serviceName="kafka"
producer.ssl.endpoint.identification.algorithm=HTTPS
producer.security.protocol=SASL_SSL
producer.ssl.truststore.location=truststore.jks
producer.ssl.truststore.password=password
producer.sasl.mechanism=GSSAPI
producer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka" principal="myprincipal";
consumer.ssl.endpoint.identification.algorithm=HTTPS
consumer.security.protocol=SASL_SSL
consumer.ssl.truststore.location=truststore.jks
consumer.ssl.truststore.password=password
consumer.sasl.mechanism=GSSAPI
consumer.sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="my.keytab" serviceName="kafka" principal="myprincipal";`
- 因此设置哨兵规则
HOST=[HOST]->CLUSTER=kafka-cluster->action=idempotentwrite
HOST=[HOST]->TRANSACTIONALID=[ID]->action=describe
HOST=[HOST]->TRANSACTIONALID=[ID]->action=write