具有多个加密密钥提供程序的 EMR

EMR with multiple encryption key providers

我是 运行 EMR 集群,启用了 s3 client-side encryption 使用自定义密钥提供程序。但现在我需要使用不同的加密模式将数据写入多个 s3 目的地:

  1. CSE 自定义密钥提供程序
  2. CSE-KMS

是否可以通过定义 s3 存储桶和加密类型之间的某种映射来配置 EMR 使用两种加密类型?

或者,因为我使用 spark 结构化流来处理数据并将数据写入 s3,所以我想知道是否可以在 EMRFS 上禁用加密,然后分别为每个流启用 CSE?

我不能代表 Amazon EMR,但在 hadoop 的 s3a 连接器上,您可以逐个桶地设置加密策略。但是,S3A 不支持客户端加密,因为它打破了关于文件长度的基本假设(您可以读取的数据量必须 == 目录 listing/getFileStatus 调用中的长度)。

我希望亚马逊也能做类似的事情。您可以创建具有不同设置的自定义 Hadoop Configuration 对象,并使用它来检索用于保存内容的文件系统实例。不过在 Spark 中比较棘手。

想法是支持任何文件系统方案并单独配置它。例如:

# custom encryption key provider
fs.s3x.cse.enabled = true
fs.s3x.cse.materialsDescription.enabled = true
fs.s3x.cse.encryptionMaterialsProvider = my.company.fs.encryption.CustomKeyProvider

#no encryption
fs.s3u.cse.enabled = false

#AWS KMS
fs.s3k.cse.enabled = true
fs.s3k.cse.encryptionMaterialsProvider = com.amazon.ws.emr.hadoop.fs.cse.KMSEncryptionMaterialsProvider
fs.s3k.cse.kms.keyId = some-kms-id

然后像这样在 spark 中使用它:

StreamingQuery writeStream = session
        .readStream()
        .schema(RecordSchema.fromClass(TestRecord.class))
        .option(OPTION_KEY_DELIMITER, OPTION_VALUE_DELIMITER_TAB)
        .option(OPTION_KEY_QUOTE, OPTION_VALUE_QUOTATION_OFF)
        .csv(“s3x://aws-s3-bucket/input”)
        .as(Encoders.bean(TestRecord.class))
        .writeStream()
        .outputMode(OutputMode.Append())
        .format("parquet")
        .option("path", “s3k://aws-s3-bucket/output”)
        .option("checkpointLocation", “s3u://aws-s3-bucket/checkpointing”)
        .start();

为了解决这个问题,我实现了一个自定义 Hadoop 文件系统(扩展 org.apache.hadoop.fs.FileSystem),该系统将调用委托给真实文件系统,但配置已修改。

// Create delegate FS
this.config.set("fs.s3n.impl", “com.amazon.ws.emr.hadoop.fs.EmrFileSystem”);
this.config.set("fs.s3n.impl.disable.cache", Boolean.toString(true));
this.delegatingFs = FileSystem.get(s3nURI(originalUri, SCHEME_S3N), substituteS3Config(conf));

传递给委派文件系统的配置应采用所有原始设置并将出现的任何 fs.s3*. 替换为 fs.s3n.

private Configuration substituteS3Config(final Configuration conf) {
    if (conf == null) return null;

    final String fsSchemaPrefix = "fs." + getScheme() + ".";
    final String fsS3SchemaPrefix = "fs.s3.";
    final String fsSchemaImpl = "fs." + getScheme() + ".impl";
    Configuration substitutedConfig = new Configuration(conf);
    for (Map.Entry<String, String> configEntry : conf) {
        String propName = configEntry.getKey();
        if (!fsSchemaImpl.equals(propName)
            && propName.startsWith(fsSchemaPrefix)) {
            final String newPropName = propName.replace(fsSchemaPrefix, fsS3SchemaPrefix);
            LOG.info("Substituting property '{}' with '{}'", propName, newPropName);
            substitutedConfig.set(newPropName, configEntry.getValue());
        }
    }

    return substitutedConfig;
}

除此之外确保委托 fs 接收 uris 和支持方案的路径以及 returns 自定义方案的路径

@Override
public FileStatus getFileStatus(final Path f) throws IOException {
    FileStatus status = this.delegatingFs.getFileStatus(s3Path(f));
    if (status != null) {
        status.setPath(customS3Path(status.getPath()));
    }
    return status;
}

private Path s3Path(final Path p) {
    if (p.toUri() != null && getScheme().equals(p.toUri().getScheme())) {
        return new Path(s3nURI(p.toUri(), SCHEME_S3N));
    }
    return p;
}

private Path customS3Path(final Path p) {
    if (p.toUri() != null && !getScheme().equals(p.toUri().getScheme())) {
        return new Path(s3nURI(p.toUri(), getScheme()));
    }
    return p;
}

private URI s3nURI(final URI originalUri, final String newScheme) {
     try {
         return new URI(
             newScheme,
             originalUri.getUserInfo(),
             originalUri.getHost(),
             originalUri.getPort(),
             originalUri.getPath(),
             originalUri.getQuery(),
             originalUri.getFragment());
     } catch (URISyntaxException e) {
         LOG.warn("Unable to convert URI {} to {} scheme", originalUri, newScheme);
     }

     return originalUri;
}

最后一步是向 Hadoop 注册自定义文件系统(spark-defaults 分类)

spark.hadoop.fs.s3x.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3u.impl = my.company.fs.DynamicS3FileSystem
spark.hadoop.fs.s3k.impl = my.company.fs.DynamicS3FileSystem

当您使用 EMRFS 时,您可以按以下格式指定 per-bucket 配置:

fs.s3.bucket.<bucket name>.<some.configuration>

因此,例如,要关闭除存储桶 s3://foobar 之外的 CSE,您可以设置:

   "Classification": "emrfs-site",
   "Properties": {
      "fs.s3.cse.enabled": "false",
      "fs.s3.bucket.foobar.cse.enabled": "true",
      [your other configs as usual]
   }

请注意,它必须是 fs.s3 而不是像 fs.s3n 那样的 fs.{arbitrary-scheme}