Kafka Streams - 内部主题的 ACL
Kafka Streams - ACLs for Internal Topics
我正在尝试设置一个安全的 Kafka 集群,但在使用 ACL 时遇到了一些困难。
The Confluent security guide for Kafka Streams (https://docs.confluent.io/current/streams/developer-guide/security.html) 只是指出必须将 Cluster Create ACL 提供给主体......但它没有说明如何实际处理内部主题。
通过研究和实验,我确定(对于 Kafka 版本 1.0.0):
- 通配符不能与 ACL 中的主题名称文本一起使用。例如,由于所有内部主题都以应用程序 ID 为前缀,我的第一个想法是将 acl 应用于匹配“
-*”的主题。这行不通。
- 由 Streams 创建的主题 API 不会自动向创建者授予 read/write 访问权限。
内部主题的确切名称是否可预测且一致?换句话说,如果我 运行 我的应用程序在开发服务器上,当 运行 时是否会在生产服务器上创建完全相同的主题?如果是这样,那么我可以在部署之前添加从 dev 派生的 ACL。如果没有,应该如何添加ACL?
Are the exact names of the internal topics predictable and consistent? In other words, if I run my application on a dev server, will the exact same topics be created on the production server when run?
是的,您将从 运行 运行 中获得完全相同的主题名称。 DSL 生成的处理器名称 with a function 如下所示:
public String newProcessorName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
(其中 index
只是一个递增的整数)。然后将这些处理器名称用于 create repartition topics 函数,如下所示(参数 name
是如上生成的处理器名称):
static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
final Serde<K1> keySerde,
final Serde<V1> valSerde,
final String topicNamePrefix,
final String name) {
Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
String baseName = topicNamePrefix != null ? topicNamePrefix : name;
String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
String sinkName = builder.newProcessorName(SINK_NAME);
String filterName = builder.newProcessorName(FILTER_NAME);
String sourceName = builder.newProcessorName(SOURCE_NAME);
builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
@Override
public boolean test(final K1 key, final V1 value) {
return key != null;
}
}, false), name);
builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
null, filterName);
builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
keyDeserializer, valDeserializer, repartitionTopic);
return sourceName;
}
如果你不改变你的拓扑——比如,如果不改变它的构建顺序等等——无论在哪里构建拓扑,你都会得到相同的结果(假设你正在使用相同版本的 Kafka Streams)。
If so, then I can just add ACLs derived from dev before deploying. If not, how should the ACLs be added?
我没有使用过 ACL,但我想既然这些只是常规主题,那么是的,您可以对它们应用 ACL。 security guide 确实提到:
When applications are run against a secured Kafka cluster, the principal running the application must have the ACL --cluster --operation Create set so that the application has the permissions to create internal topics.
不过,我自己也一直在想这个问题,所以如果我错了,我猜 Confluent 的人会纠正我。
我正在尝试设置一个安全的 Kafka 集群,但在使用 ACL 时遇到了一些困难。
The Confluent security guide for Kafka Streams (https://docs.confluent.io/current/streams/developer-guide/security.html) 只是指出必须将 Cluster Create ACL 提供给主体......但它没有说明如何实际处理内部主题。
通过研究和实验,我确定(对于 Kafka 版本 1.0.0):
- 通配符不能与 ACL 中的主题名称文本一起使用。例如,由于所有内部主题都以应用程序 ID 为前缀,我的第一个想法是将 acl 应用于匹配“
-*”的主题。这行不通。 - 由 Streams 创建的主题 API 不会自动向创建者授予 read/write 访问权限。
内部主题的确切名称是否可预测且一致?换句话说,如果我 运行 我的应用程序在开发服务器上,当 运行 时是否会在生产服务器上创建完全相同的主题?如果是这样,那么我可以在部署之前添加从 dev 派生的 ACL。如果没有,应该如何添加ACL?
Are the exact names of the internal topics predictable and consistent? In other words, if I run my application on a dev server, will the exact same topics be created on the production server when run?
是的,您将从 运行 运行 中获得完全相同的主题名称。 DSL 生成的处理器名称 with a function 如下所示:
public String newProcessorName(final String prefix) {
return prefix + String.format("%010d", index.getAndIncrement());
}
(其中 index
只是一个递增的整数)。然后将这些处理器名称用于 create repartition topics 函数,如下所示(参数 name
是如上生成的处理器名称):
static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
final Serde<K1> keySerde,
final Serde<V1> valSerde,
final String topicNamePrefix,
final String name) {
Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
String baseName = topicNamePrefix != null ? topicNamePrefix : name;
String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
String sinkName = builder.newProcessorName(SINK_NAME);
String filterName = builder.newProcessorName(FILTER_NAME);
String sourceName = builder.newProcessorName(SOURCE_NAME);
builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
@Override
public boolean test(final K1 key, final V1 value) {
return key != null;
}
}, false), name);
builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
null, filterName);
builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
keyDeserializer, valDeserializer, repartitionTopic);
return sourceName;
}
如果你不改变你的拓扑——比如,如果不改变它的构建顺序等等——无论在哪里构建拓扑,你都会得到相同的结果(假设你正在使用相同版本的 Kafka Streams)。
If so, then I can just add ACLs derived from dev before deploying. If not, how should the ACLs be added?
我没有使用过 ACL,但我想既然这些只是常规主题,那么是的,您可以对它们应用 ACL。 security guide 确实提到:
When applications are run against a secured Kafka cluster, the principal running the application must have the ACL --cluster --operation Create set so that the application has the permissions to create internal topics.
不过,我自己也一直在想这个问题,所以如果我错了,我猜 Confluent 的人会纠正我。