从 S3 事件触发 AWS EMR Spark 作业

Triggering an AWS EMR Spark job from an S3 event

我正在考虑使用 AWS EMR Spark 运行 Spark 应用程序来处理存储在 S3 上的非常大的 Parquet 文件。这里的整体流程是 Java 进程会将这些大文件上传到 S3,我想自动触发 Spark 作业的 运行ning(注入的 S3 键名上传的文件)在这些文件上。

理想情况下,会有某种基于 S3 的 EMR 触发器可供连接;也就是说,我将 EMR/Spark 到 "listen" 配置到 S3 存储桶,并在对该存储桶进行更新时启动 Spark 作业。

如果不存在这样的触发器,我可能会拼凑一些东西,例如从 S3 事件中启动 Lambda,然后让 Lambda 以某种方式触发 EMR Spark 作业。

但是我的理解如果我错了请纠正我)是启动 Spark 作业的唯一方法是至:

  1. 将作业打包为可执行 JAR 文件;和
  2. 通过spark-submitshell脚本
  3. 将其提交到集群(EMR或其他方式)

因此,如果我必须执行基于 Lambda 的 kludge,我不确定触发 EMR/Spark 作业的最佳方法是什么,因为 Lambda 本身并不携带 spark-submit 在他们的 运行 次。即使我配置了自己的 Lambda 运行time(我 相信 现在可以做到),这个解决方案已经感觉非常不稳定和容错。

有人曾经从 S3 触发器或 任何 AWS 触发器触发过 EMR/Spark 作业吗?

EMR Spark 作业可以作为 Adding a Spark Step 中的步骤执行。步骤不只是在bootstrap之后的EMR集群创建时间。

aws emr add-steps --cluster-id j-2AXXXXXXGAPLF --steps Type=Spark,Name="Spark Program",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,10]

因为它是一个 AWS CLI,您可以从 Lambda 调用它,您也可以在其中将 jar 文件上传到 HDFS 或 S3,然后使用 s3:// 或 hdfs:// 指向它。

该文档还有一个 Java 示例。

AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
AmazonElasticMapReduce emr = new AmazonElasticMapReduceClient(credentials);

StepFactory stepFactory = new StepFactory();
AmazonElasticMapReduceClient emr = new AmazonElasticMapReduceClient(credentials);
AddJobFlowStepsRequest req = new AddJobFlowStepsRequest();
req.withJobFlowId("j-1K48XXXXXXHCB");

List<StepConfig> stepConfigs = new ArrayList<StepConfig>();

HadoopJarStepConfig sparkStepConf = new HadoopJarStepConfig()
            .withJar("command-runner.jar")
            .withArgs("spark-submit","--executor-memory","1g","--class","org.apache.spark.examples.SparkPi","/usr/lib/spark/examples/jars/spark-examples.jar","10");            

StepConfig sparkStep = new StepConfig()
            .withName("Spark Step")
            .withActionOnFailure("CONTINUE")
            .withHadoopJarStep(sparkStepConf);

stepConfigs.add(sparkStep);
req.withSteps(stepConfigs);
AddJobFlowStepsResult result = emr.addJobFlowSteps(req);