在 AWS EMR 上使用自定义 jar 的 Spark 流作业在写入时失败

Spark streaming job using custom jar on AWS EMR fails upon write

我正在尝试使用流式数据帧将文件(csv.gz 格式)转换为 parquet。我必须使用流式数据帧,因为压缩的文件大小约为 700 MB。这项工作是 运行 在 AWS EMR 上使用自定义 jar。源、目标和检查点位置都在 AWS S3 上。但是,一旦我尝试写入检查点,作业就会失败并出现以下错误:

java.lang.IllegalArgumentException: 
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020

在 EMR 集群上还有其他 spark 作业 运行ning 从 S3 读取和写入,运行 成功(但它们没有使用 spark 流)。因此,我认为这不是 . I also looked at 中建议的 S3 文件系统访问问题,但答案对我的情况没有帮助。我正在使用 Scala:2.11.8Spark:2.1.0。 以下是我目前的代码

...

    val spark = conf match {
      case null =>
        SparkSession
          .builder()
          .appName(this.getClass.toString)
          .getOrCreate()
      case _ =>
        SparkSession
          .builder()
          .config(conf)
          .getOrCreate()
    }

    // Read CSV file into structured streaming dataframe
    val streamingDF = spark.readStream
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter","|")
      .option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
      .option("treatEmptyValuesAsNulls", "true")
      .option("nullValue","")
      .schema(schema)
      .load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
      .withColumn("event_date", "event_datetime".cast("date"))
      .withColumn("event_year", year($"event_date"))
      .withColumn("event_month", month($"event_date"))

    // Write the results to Parquet
    streamingDF.writeStream
      .format("parquet")
      .option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
      .option("compression", "gzip")
      .option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
      .partitionBy("event_year", "event_month")
      .trigger(ProcessingTime("900 seconds"))
      .start()

我也尝试在 URI 中使用 s3n:// 而不是 s3:// 但这似乎并没有有什么影响。

Tl;dr 升级 spark 或避免使用 s3 作为检查点位置

此外,您可能应该使用 s3a://

指定写入路径

A successor to the S3 Native, s3n:// filesystem, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.

https://wiki.apache.org/hadoop/AmazonS3