AWS MSK - 在打开 ACL 的情况下创建 Kafka 主题时超时
AWS MSK - Timeout when creating Kafka topic with ACL turned-on
我正在使用 AWS MSK,我想启用 ACL,但在启用 ACL 时我无法创建主题。我正在使用命令行工具进行所有操作。以下是我正在做的事情的总结:
- 创建一个新集群
- 创建主题 - 这很好用
- 在 resource=CLUSTER 和 operation=ALL 上为 client1 打开 ACL
- 使用 AdminClient 创建主题(通过提供 --bootstrap-server 选项)- 这会产生超时异常
- 重新尝试创建相同的主题 - 这会提示主题已存在的错误
- 使用 AdminClient 列出主题 - 这个returns没有主题
- 使用 Zookeeper 连接创建主题 - 这有效
- 使用 Zookeeper 连接列出主题 - 这是 returns 我创建的所有主题(甚至是那些超时的主题)
所以问题是主题是在 Zookeeper 上创建的,但代理无法访问它。大概是由于我遗漏了一些 ACL 规则。
我的命令的原始输出 运行:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
(kafka.admin.TopicCommand$)
运行 同样的命令:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=11=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
(kafka.admin.TopicCommand$)
通过 AdminClient 的主题列表:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list
通过 Zookeeper 连接的主题列表:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5
这是我的 ACL 规则:
Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
我错过了什么?
我认为这与 AWS MSK 没有任何关系,而是您的 Kafka 安全集群配置存在问题。客户端 (subscribers/producers) 和代理间操作都需要在安全集群中进行授权。您在非托管 Kafka 集群中也会遇到同样的问题。
建议在服务器上设置一个 "superuser" 用户(我称他们为服务帐户),然后为这些 "superuser" 用户提供允许您需要的经纪人间交互的 ACL为您的集群。您需要的确切 ACL 将根据您的用例和安全首选项而有所不同。
在 server.properties
中,您将添加一个条目,如 super.users=User:BrokerService
,并记录在
https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser。文档建议使用 Alice 和 Bob 作为超级用户名,这让我感到困惑。选择对您有意义的用户名。
然后您需要设置一个类似的 ACL,该 ACL 将用户名主体与您在上面创建的 "superuser" 用户一起使用,例如principal=User:BrokerService
。 ACL 将授予代理所需的任何权限。您的直接用例是允许阅读它听起来像的所有主题。您可能还需要其他 ACL 来进行代理间通信,但是如果没有关于您想要做什么的更多信息,我无法告诉您您到底需要什么。
例如,此命令用于设置 ACL。
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster
此处记录了用于设置 ACL 的更多选项以及对您的具体问题的描述 https://docs.confluent.io/current/kafka/authorization.html#acl-format
如果您正在寻找要在此处使用的确切配置,请再次研究更多或编辑您的问题,因为对您使用的 ACL 具有安全性和用例影响。
我正在使用 AWS MSK,我想启用 ACL,但在启用 ACL 时我无法创建主题。我正在使用命令行工具进行所有操作。以下是我正在做的事情的总结:
- 创建一个新集群
- 创建主题 - 这很好用
- 在 resource=CLUSTER 和 operation=ALL 上为 client1 打开 ACL
- 使用 AdminClient 创建主题(通过提供 --bootstrap-server 选项)- 这会产生超时异常
- 重新尝试创建相同的主题 - 这会提示主题已存在的错误
- 使用 AdminClient 列出主题 - 这个returns没有主题
- 使用 Zookeeper 连接创建主题 - 这有效
- 使用 Zookeeper 连接列出主题 - 这是 returns 我创建的所有主题(甚至是那些超时的主题)
所以问题是主题是在 Zookeeper 上创建的,但代理无法访问它。大概是由于我遗漏了一些 ACL 规则。
我的命令的原始输出 运行:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
[2019-09-30 17:16:19,389] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted du
e to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=10=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
(kafka.admin.TopicCommand$)
运行 同样的命令:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties \
--create --topic test3 --partitions 1 --replication-factor 1
Error while executing topic command : org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
[2019-09-30 17:25:38,266] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic
'test3' already exists.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access[=11=]0(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicExistsException: Topic 'test3' already exists.
(kafka.admin.TopicCommand$)
通过 AdminClient 的主题列表:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --bootstrap-server $B --command-config ~/client1.properties --list
通过 Zookeeper 连接的主题列表:
ubuntu@ip-172-31-27-70:~/kafka_2.12-2.2.1/bin$ ./kafka-topics.sh --zookeeper $ZK --command-config ~/client1.properties --list
test
test2
test3
test4
test5
这是我的 ACL 规则:
Current ACLs for resource `ResourcePattern(resourceType=CLUSTER, name=kafka-cluster, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=--operation=All, patternType=LITERAL)`:
(principal=User:CN=client1.com, host=*, operation=ALL, permissionType=ALLOW)
我错过了什么?
我认为这与 AWS MSK 没有任何关系,而是您的 Kafka 安全集群配置存在问题。客户端 (subscribers/producers) 和代理间操作都需要在安全集群中进行授权。您在非托管 Kafka 集群中也会遇到同样的问题。
建议在服务器上设置一个 "superuser" 用户(我称他们为服务帐户),然后为这些 "superuser" 用户提供允许您需要的经纪人间交互的 ACL为您的集群。您需要的确切 ACL 将根据您的用例和安全首选项而有所不同。
在 server.properties
中,您将添加一个条目,如 super.users=User:BrokerService
,并记录在
https://docs.confluent.io/current/kafka/authorization.html#kafka-auth-superuser。文档建议使用 Alice 和 Bob 作为超级用户名,这让我感到困惑。选择对您有意义的用户名。
然后您需要设置一个类似的 ACL,该 ACL 将用户名主体与您在上面创建的 "superuser" 用户一起使用,例如principal=User:BrokerService
。 ACL 将授予代理所需的任何权限。您的直接用例是允许阅读它听起来像的所有主题。您可能还需要其他 ACL 来进行代理间通信,但是如果没有关于您想要做什么的更多信息,我无法告诉您您到底需要什么。
例如,此命令用于设置 ACL。
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add \
--allow-principal User:BrokerService --operation All --topic '*' --cluster
此处记录了用于设置 ACL 的更多选项以及对您的具体问题的描述 https://docs.confluent.io/current/kafka/authorization.html#acl-format
如果您正在寻找要在此处使用的确切配置,请再次研究更多或编辑您的问题,因为对您使用的 ACL 具有安全性和用例影响。