如何阻止未经授权的用户访问 creating/deleting kafka 主题?

How to block creating/deleting kafka topic by unauthorized users?

我已经使用 SASL+ACL 设置了 Kafka 和 zookeeper 身份验证,并通过 SSL 双向身份验证(包括加密)将 Kafka 设置为生产者和消费者。

通过在 Kafka 和 zookeeper 之间启用 SASL 和 ACL,它不允许未经授权的 Kafka 代理登录到 zookeeper 集群。但是仍然可以不受任何限制地创建和删除主题。

zookeeper.properties

dataDir=/x02/lsesv2-s/data/Zookeeper

clientPort=15300

tickTime=2000

initLimit=10

syncLimit=5


authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

quorum.auth.enableSasl=true

quorum.auth.learnerRequireSasl=true

quorum.auth.serverRequireSasl=true

quorum.auth.learner.loginContext=QuorumLearner

quorum.auth.server.loginContext=QuorumServer

server.1=172.25.33.12:15302:15301
server.2=172.25.33.13:15302:15301
server.3=172.25.33.11:15302:15301

zookeeper_jaas.conf

Server {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        username="admin"
        password="abc123"
        user_admin="abc123";
};

QuorumServer {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        user_admin="abc123";
};

QuorumLearner {
        org.apache.zookeeper.server.auth.DigestLoginModule required
        username="admin"
        password="abc123";
};

通过以下代码设置ACL

 final CountDownLatch connectedSignal = new CountDownLatch(1);
        String connect = "localhost:15300";
        ZooKeeper zooKeeper = null;

        try
        {
            String userName = "admin";
            String password = "mit123";

            zooKeeper = new ZooKeeper(connect, 5000, we ->
            {
                if (we.getState() == Watcher.Event.KeeperState.SyncConnected)
                {
                    connectedSignal.countDown();
                }
            });

            connectedSignal.await();

            zooKeeper.addAuthInfo("digest", (userName + ":" + password).getBytes());

            final String aclString = "auth:" + userName + ":" + password + ":" + "cdrwa" + 
            ",sasl:" + userName + ":" + "cdrwa";

            zooKeeper.setACL("/", parseACLs(aclString), -1);

        } finally
        {
            if (zooKeeper != null)
            {
                zooKeeper.close();
            }
        }

上面的代码是有效的,下面是执行代码后的结果。

Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
getAcl /
'sasl,'admin
: cdrwa
'digest,'admin:oiasY+rmnmmK9mec8kpnvv281HE=
: cdrwa

我在启动时覆盖了 Kafka 属性,而不是 server.properties 文件。 *

Kafka 属性

kafka/bin/kafka-server-start.sh /x02/lsesv2-s/current/kafka/config/server.properties 
--override broker.id=1 
--override zookeeper.connect=10g-flton-onl01:15300,10g-flton-onl02:15300,10g-flton-nor02:15300 
--override num.network.threads=16 
--override num.io.threads=16 
--override socket.send.buffer.bytes=10240000
--override socket.receive.buffer.bytes=10240000 
--override log.dirs=/x02/lsesv2-s/data/Kafka 
--override offsets.topic.replication.factor=1 
--override min.insync.replicas=1 
--override inter.broker.listener.name=INTERNAL 
--override listeners=INTERNAL://10g-flton-onl01:15307 
--override advertised.listeners=INTERNAL://10g-flton-onl01:15307 
--override listener.security.protocol.map=INTERNAL:SSL 
--override security.protocol=SSL 
--override ssl.client.auth=required 
--override ssl.key.password=abc123 
--override ssl.keystore.location=configs/MHV/kafka.server.keystore.jks 
--override ssl.keystore.password=abc123 
--override ssl.truststore.location=configs/MHV/kafka.server.truststore.jks 
--override ssl.truststore.password=abc123 
--override ssl.endpoint.identification.algorithm=

Kafka 到 producer/consumer 身份验证工作正常,zookeeper 到 kafka 身份验证也工作正常。 但是,未经授权的用户也可以创建和删除主题。

话题创作

kafka/bin/kafka-topics.sh --create --zookeeper localhost:15300 --replication-factor 3 --partitions 8 --topic test

话题删除

kafka/bin/kafka-topics.sh --zookeeper localhost:15300 --delete --topic test

注意:我在创建或删除主题时没有设置-Djava.security.auth.login.config=kafka_server_jaas.conf。所以应该限制这种操作。但实际上并没有。

只为授权用户帮我创建和删除主题。

这似乎是本地测试所要求的 属性。

 KAFKA_ZOOKEEPER_SET_ACL: "true"

也可以直接使用 Confluent 图像或地图。

zookeeper.set.acl

Reference

也如 Kafka 101 Confluent

所述

the metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable. 

Because we configured ZooKeeper to require SASL authentication, we need to set the java.security.auth.login.config system property while starting the kafka-topics tool:

显示代码示例和 docker-compose 文件 here