具有多个加密密钥提供程序的 EMR
EMR with multiple encryption key providers
我是 运行 EMR 集群,启用了 s3 client-side encryption 使用自定义密钥提供程序。但现在我需要使用不同的加密模式将数据写入多个 s3 目的地:
- CSE 自定义密钥提供程序
- CSE-KMS
是否可以通过定义 s3 存储桶和加密类型之间的某种映射来配置 EMR 使用两种加密类型?
或者,因为我使用 spark 结构化流来处理数据并将数据写入 s3,所以我想知道是否可以在 EMRFS 上禁用加密,然后分别为每个流启用 CSE?
我不能代表 Amazon EMR,但在 hadoop 的 s3a 连接器上,您可以逐个桶地设置加密策略。但是,S3A 不支持客户端加密,因为它打破了关于文件长度的基本假设(您可以读取的数据量必须 == 目录 listing/getFileStatus 调用中的长度)。
我希望亚马逊也能做类似的事情。您可以创建具有不同设置的自定义 Hadoop Configuration
对象,并使用它来检索用于保存内容的文件系统实例。不过在 Spark 中比较棘手。
想法是支持任何文件系统方案并单独配置它。例如:
# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider
#no encryption
fs.s3u.cse.enabled = false
#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id
然后像这样在 spark 中使用它:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(TestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(“s3x://aws-s3-bucket/input”)
.as(Encoders.bean(TestRecord.class))
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("path", “s3k://aws-s3-bucket/output”)
.option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
.start();
为了解决这个问题,我实现了一个自定义 Hadoop 文件系统(扩展 org.apache.hadoop.fs.FileSystem
),该系统将调用委托给真实文件系统,但配置已修改。
// Create delegate FS
this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));
传递给委派文件系统的配置应采用所有原始设置并将出现的任何 fs.s3*.
替换为 fs.s3n.
。
private Configuration substituteS3Config(final Configuration conf) {
if (conf == null) return null;
final String fsSchemaPrefix = "fs." + getScheme() + ".";
final String fsS3SchemaPrefix = "fs.s3.";
final String fsSchemaImpl = "fs." + getScheme() + ".impl";
Configuration substitutedConfig = new Configuration(conf);
for (Map.Entry<String, String> configEntry : conf) {
String propName = configEntry.getKey();
if (!fsSchemaImpl.equals(propName)
&& propName.startsWith(fsSchemaPrefix)) {
final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
substitutedConfig.set(newPropName, configEntry.getValue());
}
}
return substitutedConfig;
}
除此之外确保委托 fs 接收 uris 和支持方案的路径以及 returns 自定义方案的路径
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
if (status != null) {
status.setPath(customS3Path(status.getPath()));
}
return status;
}
private Path s3Path(final Path p) {
if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), SCHEME_S3N));
}
return p;
}
private Path customS3Path(final Path p) {
if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), getScheme()));
}
return p;
}
private URI s3nURI(final URI originalUri, final String newScheme) {
try {
return new URI(
newScheme,
originalUri.getUserInfo(),
originalUri.getHost(),
originalUri.getPort(),
originalUri.getPath(),
originalUri.getQuery(),
originalUri.getFragment());
} catch (URISyntaxException e) {
LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
}
return originalUri;
}
最后一步是向 Hadoop 注册自定义文件系统(spark-defaults
分类)
spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem
当您使用 EMRFS 时,您可以按以下格式指定 per-bucket 配置:
fs.s3.bucket.<bucket name>.<some.configuration>
因此,例如,要关闭除存储桶 s3://foobar
之外的 CSE,您可以设置:
"Classification": "emrfs-site",
"Properties": {
"fs.s3.cse.enabled": "false",
"fs.s3.bucket.foobar.cse.enabled": "true",
[your other configs as usual]
}
请注意,它必须是 fs.s3
而不是像 fs.s3n
那样的 fs.{arbitrary-scheme}
。
我是 运行 EMR 集群,启用了 s3 client-side encryption 使用自定义密钥提供程序。但现在我需要使用不同的加密模式将数据写入多个 s3 目的地:
- CSE 自定义密钥提供程序
- CSE-KMS
是否可以通过定义 s3 存储桶和加密类型之间的某种映射来配置 EMR 使用两种加密类型?
或者,因为我使用 spark 结构化流来处理数据并将数据写入 s3,所以我想知道是否可以在 EMRFS 上禁用加密,然后分别为每个流启用 CSE?
我不能代表 Amazon EMR,但在 hadoop 的 s3a 连接器上,您可以逐个桶地设置加密策略。但是,S3A 不支持客户端加密,因为它打破了关于文件长度的基本假设(您可以读取的数据量必须 == 目录 listing/getFileStatus 调用中的长度)。
我希望亚马逊也能做类似的事情。您可以创建具有不同设置的自定义 Hadoop Configuration
对象,并使用它来检索用于保存内容的文件系统实例。不过在 Spark 中比较棘手。
想法是支持任何文件系统方案并单独配置它。例如:
# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider
#no encryption
fs.s3u.cse.enabled = false
#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id
然后像这样在 spark 中使用它:
StreamingQuery writeStream = session
.readStream()
.schema(RecordSchema.fromClass(TestRecord.class))
.option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
.option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
.csv(“s3x://aws-s3-bucket/input”)
.as(Encoders.bean(TestRecord.class))
.writeStream()
.outputMode(OutputMode.Append())
.format("parquet")
.option("path", “s3k://aws-s3-bucket/output”)
.option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
.start();
为了解决这个问题,我实现了一个自定义 Hadoop 文件系统(扩展 org.apache.hadoop.fs.FileSystem
),该系统将调用委托给真实文件系统,但配置已修改。
// Create delegate FS
this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));
传递给委派文件系统的配置应采用所有原始设置并将出现的任何 fs.s3*.
替换为 fs.s3n.
。
private Configuration substituteS3Config(final Configuration conf) {
if (conf == null) return null;
final String fsSchemaPrefix = "fs." + getScheme() + ".";
final String fsS3SchemaPrefix = "fs.s3.";
final String fsSchemaImpl = "fs." + getScheme() + ".impl";
Configuration substitutedConfig = new Configuration(conf);
for (Map.Entry<String, String> configEntry : conf) {
String propName = configEntry.getKey();
if (!fsSchemaImpl.equals(propName)
&& propName.startsWith(fsSchemaPrefix)) {
final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
substitutedConfig.set(newPropName, configEntry.getValue());
}
}
return substitutedConfig;
}
除此之外确保委托 fs 接收 uris 和支持方案的路径以及 returns 自定义方案的路径
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
if (status != null) {
status.setPath(customS3Path(status.getPath()));
}
return status;
}
private Path s3Path(final Path p) {
if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), SCHEME_S3N));
}
return p;
}
private Path customS3Path(final Path p) {
if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
return new Path(s3nURI(p.toUri(), getScheme()));
}
return p;
}
private URI s3nURI(final URI originalUri, final String newScheme) {
try {
return new URI(
newScheme,
originalUri.getUserInfo(),
originalUri.getHost(),
originalUri.getPort(),
originalUri.getPath(),
originalUri.getQuery(),
originalUri.getFragment());
} catch (URISyntaxException e) {
LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
}
return originalUri;
}
最后一步是向 Hadoop 注册自定义文件系统(spark-defaults
分类)
spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem
当您使用 EMRFS 时,您可以按以下格式指定 per-bucket 配置:
fs.s3.bucket.<bucket name>.<some.configuration>
因此,例如,要关闭除存储桶 s3://foobar
之外的 CSE,您可以设置:
"Classification": "emrfs-site",
"Properties": {
"fs.s3.cse.enabled": "false",
"fs.s3.bucket.foobar.cse.enabled": "true",
[your other configs as usual]
}
请注意,它必须是 fs.s3
而不是像 fs.s3n
那样的 fs.{arbitrary-scheme}
。