如何阻止未经授权的用户访问 creating/deleting kafka 主题?
How to block creating/deleting kafka topic by unauthorized users?
我已经使用 SASL+ACL 设置了 Kafka 和 zookeeper 身份验证,并通过 SSL 双向身份验证(包括加密)将 Kafka 设置为生产者和消费者。
通过在 Kafka 和 zookeeper 之间启用 SASL 和 ACL,它不允许未经授权的 Kafka 代理登录到 zookeeper 集群。但是仍然可以不受任何限制地创建和删除主题。
zookeeper.properties
dataDir=/x02/lsesv2-s/data/Zookeeper
clientPort=15300
tickTime=2000
initLimit=10
syncLimit=5
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
server.1=172.25.33.12:15302:15301
server.2=172.25.33.13:15302:15301
server.3=172.25.33.11:15302:15301
zookeeper_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="abc123"
user_admin="abc123";
};
QuorumServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="abc123";
};
QuorumLearner {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="abc123";
};
通过以下代码设置ACL
final CountDownLatch connectedSignal = new CountDownLatch(1);
String connect = "localhost:15300";
ZooKeeper zooKeeper = null;
try
{
String userName = "admin";
String password = "mit123";
zooKeeper = new ZooKeeper(connect, 5000, we ->
{
if (we.getState() == Watcher.Event.KeeperState.SyncConnected)
{
connectedSignal.countDown();
}
});
connectedSignal.await();
zooKeeper.addAuthInfo("digest", (userName + ":" + password).getBytes());
final String aclString = "auth:" + userName + ":" + password + ":" + "cdrwa" +
",sasl:" + userName + ":" + "cdrwa";
zooKeeper.setACL("/", parseACLs(aclString), -1);
} finally
{
if (zooKeeper != null)
{
zooKeeper.close();
}
}
上面的代码是有效的,下面是执行代码后的结果。
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
getAcl /
'sasl,'admin
: cdrwa
'digest,'admin:oiasY+rmnmmK9mec8kpnvv281HE=
: cdrwa
我在启动时覆盖了 Kafka 属性,而不是 server.properties 文件。 *
Kafka 属性
kafka/bin/kafka-server-start.sh /x02/lsesv2-s/current/kafka/config/server.properties
--override broker.id=1
--override zookeeper.connect=10g-flton-onl01:15300,10g-flton-onl02:15300,10g-flton-nor02:15300
--override num.network.threads=16
--override num.io.threads=16
--override socket.send.buffer.bytes=10240000
--override socket.receive.buffer.bytes=10240000
--override log.dirs=/x02/lsesv2-s/data/Kafka
--override offsets.topic.replication.factor=1
--override min.insync.replicas=1
--override inter.broker.listener.name=INTERNAL
--override listeners=INTERNAL://10g-flton-onl01:15307
--override advertised.listeners=INTERNAL://10g-flton-onl01:15307
--override listener.security.protocol.map=INTERNAL:SSL
--override security.protocol=SSL
--override ssl.client.auth=required
--override ssl.key.password=abc123
--override ssl.keystore.location=configs/MHV/kafka.server.keystore.jks
--override ssl.keystore.password=abc123
--override ssl.truststore.location=configs/MHV/kafka.server.truststore.jks
--override ssl.truststore.password=abc123
--override ssl.endpoint.identification.algorithm=
Kafka 到 producer/consumer 身份验证工作正常,zookeeper 到 kafka 身份验证也工作正常。 但是,未经授权的用户也可以创建和删除主题。
话题创作
kafka/bin/kafka-topics.sh --create --zookeeper localhost:15300 --replication-factor 3 --partitions 8 --topic test
话题删除
kafka/bin/kafka-topics.sh --zookeeper localhost:15300 --delete --topic test
注意:我在创建或删除主题时没有设置-Djava.security.auth.login.config=kafka_server_jaas.conf。所以应该限制这种操作。但实际上并没有。
只为授权用户帮我创建和删除主题。
这似乎是本地测试所要求的 属性。
KAFKA_ZOOKEEPER_SET_ACL: "true"
也可以直接使用 Confluent 图像或地图。
zookeeper.set.acl
所述
the metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable.
Because we configured ZooKeeper to require SASL authentication, we need to set the java.security.auth.login.config system property while starting the kafka-topics tool:
显示代码示例和 docker-compose 文件 here
我已经使用 SASL+ACL 设置了 Kafka 和 zookeeper 身份验证,并通过 SSL 双向身份验证(包括加密)将 Kafka 设置为生产者和消费者。
通过在 Kafka 和 zookeeper 之间启用 SASL 和 ACL,它不允许未经授权的 Kafka 代理登录到 zookeeper 集群。但是仍然可以不受任何限制地创建和删除主题。
zookeeper.properties
dataDir=/x02/lsesv2-s/data/Zookeeper
clientPort=15300
tickTime=2000
initLimit=10
syncLimit=5
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
quorum.auth.enableSasl=true
quorum.auth.learnerRequireSasl=true
quorum.auth.serverRequireSasl=true
quorum.auth.learner.loginContext=QuorumLearner
quorum.auth.server.loginContext=QuorumServer
server.1=172.25.33.12:15302:15301
server.2=172.25.33.13:15302:15301
server.3=172.25.33.11:15302:15301
zookeeper_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="abc123"
user_admin="abc123";
};
QuorumServer {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="abc123";
};
QuorumLearner {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="abc123";
};
通过以下代码设置ACL
final CountDownLatch connectedSignal = new CountDownLatch(1);
String connect = "localhost:15300";
ZooKeeper zooKeeper = null;
try
{
String userName = "admin";
String password = "mit123";
zooKeeper = new ZooKeeper(connect, 5000, we ->
{
if (we.getState() == Watcher.Event.KeeperState.SyncConnected)
{
connectedSignal.countDown();
}
});
connectedSignal.await();
zooKeeper.addAuthInfo("digest", (userName + ":" + password).getBytes());
final String aclString = "auth:" + userName + ":" + password + ":" + "cdrwa" +
",sasl:" + userName + ":" + "cdrwa";
zooKeeper.setACL("/", parseACLs(aclString), -1);
} finally
{
if (zooKeeper != null)
{
zooKeeper.close();
}
}
上面的代码是有效的,下面是执行代码后的结果。
Welcome to ZooKeeper!
JLine support is disabled
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
getAcl /
'sasl,'admin
: cdrwa
'digest,'admin:oiasY+rmnmmK9mec8kpnvv281HE=
: cdrwa
我在启动时覆盖了 Kafka 属性,而不是 server.properties 文件。 *
Kafka 属性
kafka/bin/kafka-server-start.sh /x02/lsesv2-s/current/kafka/config/server.properties
--override broker.id=1
--override zookeeper.connect=10g-flton-onl01:15300,10g-flton-onl02:15300,10g-flton-nor02:15300
--override num.network.threads=16
--override num.io.threads=16
--override socket.send.buffer.bytes=10240000
--override socket.receive.buffer.bytes=10240000
--override log.dirs=/x02/lsesv2-s/data/Kafka
--override offsets.topic.replication.factor=1
--override min.insync.replicas=1
--override inter.broker.listener.name=INTERNAL
--override listeners=INTERNAL://10g-flton-onl01:15307
--override advertised.listeners=INTERNAL://10g-flton-onl01:15307
--override listener.security.protocol.map=INTERNAL:SSL
--override security.protocol=SSL
--override ssl.client.auth=required
--override ssl.key.password=abc123
--override ssl.keystore.location=configs/MHV/kafka.server.keystore.jks
--override ssl.keystore.password=abc123
--override ssl.truststore.location=configs/MHV/kafka.server.truststore.jks
--override ssl.truststore.password=abc123
--override ssl.endpoint.identification.algorithm=
Kafka 到 producer/consumer 身份验证工作正常,zookeeper 到 kafka 身份验证也工作正常。 但是,未经授权的用户也可以创建和删除主题。
话题创作
kafka/bin/kafka-topics.sh --create --zookeeper localhost:15300 --replication-factor 3 --partitions 8 --topic test
话题删除
kafka/bin/kafka-topics.sh --zookeeper localhost:15300 --delete --topic test
注意:我在创建或删除主题时没有设置-Djava.security.auth.login.config=kafka_server_jaas.conf。所以应该限制这种操作。但实际上并没有。
只为授权用户帮我创建和删除主题。
这似乎是本地测试所要求的 属性。
KAFKA_ZOOKEEPER_SET_ACL: "true"
也可以直接使用 Confluent 图像或地图。
zookeeper.set.acl
所述
the metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable.
Because we configured ZooKeeper to require SASL authentication, we need to set the java.security.auth.login.config system property while starting the kafka-topics tool:
显示代码示例和 docker-compose 文件 here