在 AWS EMR 上使用自定义 jar 的 Spark 流作业在写入时失败
Spark streaming job using custom jar on AWS EMR fails upon write
我正在尝试使用流式数据帧将文件(csv.gz 格式)转换为 parquet。我必须使用流式数据帧,因为压缩的文件大小约为 700 MB。这项工作是 运行 在 AWS EMR 上使用自定义 jar。源、目标和检查点位置都在 AWS S3 上。但是,一旦我尝试写入检查点,作业就会失败并出现以下错误:
java.lang.IllegalArgumentException:
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020
在 EMR 集群上还有其他 spark 作业 运行ning 从 S3 读取和写入,运行 成功(但它们没有使用 spark 流)。因此,我认为这不是 . I also looked at 中建议的 S3 文件系统访问问题,但答案对我的情况没有帮助。我正在使用 Scala:2.11.8 和 Spark:2.1.0。
以下是我目前的代码
...
val spark = conf match {
case null =>
SparkSession
.builder()
.appName(this.getClass.toString)
.getOrCreate()
case _ =>
SparkSession
.builder()
.config(conf)
.getOrCreate()
}
// Read CSV file into structured streaming dataframe
val streamingDF = spark.readStream
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter","|")
.option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue","")
.schema(schema)
.load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
.withColumn("event_date", "event_datetime".cast("date"))
.withColumn("event_year", year($"event_date"))
.withColumn("event_month", month($"event_date"))
// Write the results to Parquet
streamingDF.writeStream
.format("parquet")
.option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
.option("compression", "gzip")
.option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
.partitionBy("event_year", "event_month")
.trigger(ProcessingTime("900 seconds"))
.start()
我也尝试在 URI 中使用 s3n:// 而不是 s3:// 但这似乎并没有有什么影响。
Tl;dr 升级 spark 或避免使用 s3 作为检查点位置
此外,您可能应该使用 s3a://
指定写入路径
A successor to the S3 Native, s3n:// filesystem, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.
我正在尝试使用流式数据帧将文件(csv.gz 格式)转换为 parquet。我必须使用流式数据帧,因为压缩的文件大小约为 700 MB。这项工作是 运行 在 AWS EMR 上使用自定义 jar。源、目标和检查点位置都在 AWS S3 上。但是,一旦我尝试写入检查点,作业就会失败并出现以下错误:
java.lang.IllegalArgumentException:
Wrong FS: s3://my-bucket-name/transformData/checkpoints/sourceName/fileType/metadata,
expected: hdfs://ip-<ip_address>.us-west-2.compute.internal:8020
在 EMR 集群上还有其他 spark 作业 运行ning 从 S3 读取和写入,运行 成功(但它们没有使用 spark 流)。因此,我认为这不是
...
val spark = conf match {
case null =>
SparkSession
.builder()
.appName(this.getClass.toString)
.getOrCreate()
case _ =>
SparkSession
.builder()
.config(conf)
.getOrCreate()
}
// Read CSV file into structured streaming dataframe
val streamingDF = spark.readStream
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter","|")
.option("timestampFormat", "dd-MMM-yyyy HH:mm:ss")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue","")
.schema(schema)
.load(s"s3://my-bucket-name/rawData/sourceName/fileType/*/*/fileNamePrefix*")
.withColumn("event_date", "event_datetime".cast("date"))
.withColumn("event_year", year($"event_date"))
.withColumn("event_month", month($"event_date"))
// Write the results to Parquet
streamingDF.writeStream
.format("parquet")
.option("path", "s3://my-bucket-name/transformedData/sourceName/fileType/")
.option("compression", "gzip")
.option("checkpointLocation", "s3://my-bucket-name/transformedData/checkpoints/sourceName/fileType/")
.partitionBy("event_year", "event_month")
.trigger(ProcessingTime("900 seconds"))
.start()
我也尝试在 URI 中使用 s3n:// 而不是 s3:// 但这似乎并没有有什么影响。
Tl;dr 升级 spark 或避免使用 s3 作为检查点位置
此外,您可能应该使用 s3a://
指定写入路径A successor to the S3 Native, s3n:// filesystem, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.