从 Spark 到 S3 的分段上传错误
Multipart upload error to S3 from Spark
我在尝试关闭序列文件编写器时遇到错误 "Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail"。完整的异常日志如下:
16/12/30 19:47:01 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0001 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:12 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0002 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:23 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0003 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:35 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0004 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:46 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0005 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a
16/12/30 19:47:57 ERROR s3n.MultipartUploadOutputStream: Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail
16/12/30 19:47:57 INFO s3n.MultipartUploadOutputStream: completeMultipartUpload error for key: output/part-20176
java.lang.IllegalStateException: Reached max limit of upload attempts for part
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.spawnNewFutureIfNeeded(MultipartUploadOutputStream.java:362)
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadMultiParts(MultipartUploadOutputStream.java:422)
at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:471)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1290)
...
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:727)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:727)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/12/30 19:47:59 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException:
16/12/30 19:48:18 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException:
我刚收到 5 次重试失败的错误。我不明白这是为什么。有没有人见过这个错误?这可能是什么原因?
我正在使用我自己的多输出格式实现编写序列文件:
class MultiOutputSequenceFileWriter(prefix: String, suffix: String) extends Serializable {
private val writers = collection.mutable.Map[String, SequenceFile.Writer]()
/**
* @param pathKey folder within prefix where the content will be written
* @param valueKey key of the data to be written
* @param valueValue value of the data to be written
*/
def write(pathKey: String, valueKey: Any, valueValue: Any) = {
if (!writers.contains(pathKey)) {
val path = new Path(prefix + "/" + pathKey + "/" + "part-" + suffix)
val hadoopConf = new conf.Configuration()
hadoopConf.setEnum("io.seqfile.compression.type", SequenceFile.CompressionType.NONE)
val fs = FileSystem.get(hadoopConf)
writers(pathKey) = SequenceFile.createWriter(hadoopConf, Writer.file(path),
Writer.keyClass(valueKey.getClass()),
Writer.valueClass(valueValue.getClass()),
Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size", 4096)), //4KB
Writer.replication(fs.getDefaultReplication()),
Writer.blockSize(1073741824), // 1GB
Writer.progressable(null),
Writer.metadata(new Metadata()))
}
writers(pathKey).append(valueKey, valueValue)
}
def close = writers.values.foreach(_.close())
}
我正在尝试编写如下序列文件:
...
rdd.mapPartitionsWithIndex { (p, it) => {
val writer = new MultiOutputSequenceFileWriter("s3://bucket/output/", p.toString)
for ( (key1, key2, data) <- it) {
...
writer.write(key1, key2, data)
...
}
writer.close
Nil.iterator
}.foreach( (x:Nothing) => ()) // To trigger iterator
}
...
注:
- 当我试图关闭作者时出现异常(我认为作者试图在关闭之前编写内容,我认为异常是由于这个原因导致的)。
- 我用相同的输入又重试了两次相同的作业。我在第一个 re-运行 中没有出错,但在第二个中出错了三个。它可能只是 S3 中的暂时性问题吗?
- S3 中不存在失败的零件文件。
编写器(顺便说一句,Amazon 代码,spark 或 hadoop 团队不会处理任何事情)在数据生成时(在后台线程中)以块的形式写入数据,其余数据和多部分上传在 close() 中提交— 这也是代码将阻止等待所有待处理的上传完成的地方。
听起来有些 PUT 请求失败了,在 close() 调用中发现并报告了此失败。我不知道 EMR s3:// 客户端是否使用该块大小作为其分区的大小标记;我个人建议使用较小的大小,例如 128MB。
无论如何:假定暂时性网络问题,或者您分配的 EC2 VM 网络连接不良。要求一个新的虚拟机。
AWS 支持工程师提到,在发生错误时,存储桶有很多命中。该作业正在重试默认次数 (5),并且很可能所有重试都被限制了。现在,我在提交作业时添加了以下配置参数,增加了重试次数。
spark.hadoop.fs.s3.maxRetries=20
此外,我压缩了输出,以便减少对 S3 的请求数量。这些更改后,我还没有看到几次运行失败。
我在尝试关闭序列文件编写器时遇到错误 "Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail"。完整的异常日志如下:
16/12/30 19:47:01 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0001 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:12 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0002 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:23 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0003 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:35 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt1/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0004 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:46 INFO s3n.MultipartUploadOutputStream: uploadPart /mnt/s3/57b63810-c20a-438c-a73f-48d50e0be7d2-0005 94317523 bytes md5: 05ww/fe3pNni9Zvfm+l4Gg== md5hex: d39c30fdf7b7a4d9e2f59bdf9be9781a 16/12/30 19:47:57 ERROR s3n.MultipartUploadOutputStream: Upload attempts for part num: 2 have already reached max limit of: 5, will throw exception and fail 16/12/30 19:47:57 INFO s3n.MultipartUploadOutputStream: completeMultipartUpload error for key: output/part-20176 java.lang.IllegalStateException: Reached max limit of upload attempts for part at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.spawnNewFutureIfNeeded(MultipartUploadOutputStream.java:362) at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadMultiParts(MultipartUploadOutputStream.java:422) at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:471) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74) at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108) at org.apache.hadoop.io.SequenceFile$Writer.close(SequenceFile.java:1290) ... at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:727) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$$anonfun$apply.apply(RDD.scala:727) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/12/30 19:47:59 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException: 16/12/30 19:48:18 INFO s3n.MultipartUploadOutputStream: uploadPart error com.amazonaws.AbortedException:
我刚收到 5 次重试失败的错误。我不明白这是为什么。有没有人见过这个错误?这可能是什么原因?
我正在使用我自己的多输出格式实现编写序列文件:
class MultiOutputSequenceFileWriter(prefix: String, suffix: String) extends Serializable {
private val writers = collection.mutable.Map[String, SequenceFile.Writer]()
/**
* @param pathKey folder within prefix where the content will be written
* @param valueKey key of the data to be written
* @param valueValue value of the data to be written
*/
def write(pathKey: String, valueKey: Any, valueValue: Any) = {
if (!writers.contains(pathKey)) {
val path = new Path(prefix + "/" + pathKey + "/" + "part-" + suffix)
val hadoopConf = new conf.Configuration()
hadoopConf.setEnum("io.seqfile.compression.type", SequenceFile.CompressionType.NONE)
val fs = FileSystem.get(hadoopConf)
writers(pathKey) = SequenceFile.createWriter(hadoopConf, Writer.file(path),
Writer.keyClass(valueKey.getClass()),
Writer.valueClass(valueValue.getClass()),
Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size", 4096)), //4KB
Writer.replication(fs.getDefaultReplication()),
Writer.blockSize(1073741824), // 1GB
Writer.progressable(null),
Writer.metadata(new Metadata()))
}
writers(pathKey).append(valueKey, valueValue)
}
def close = writers.values.foreach(_.close())
}
我正在尝试编写如下序列文件:
...
rdd.mapPartitionsWithIndex { (p, it) => {
val writer = new MultiOutputSequenceFileWriter("s3://bucket/output/", p.toString)
for ( (key1, key2, data) <- it) {
...
writer.write(key1, key2, data)
...
}
writer.close
Nil.iterator
}.foreach( (x:Nothing) => ()) // To trigger iterator
}
...
注:
- 当我试图关闭作者时出现异常(我认为作者试图在关闭之前编写内容,我认为异常是由于这个原因导致的)。
- 我用相同的输入又重试了两次相同的作业。我在第一个 re-运行 中没有出错,但在第二个中出错了三个。它可能只是 S3 中的暂时性问题吗?
- S3 中不存在失败的零件文件。
编写器(顺便说一句,Amazon 代码,spark 或 hadoop 团队不会处理任何事情)在数据生成时(在后台线程中)以块的形式写入数据,其余数据和多部分上传在 close() 中提交— 这也是代码将阻止等待所有待处理的上传完成的地方。
听起来有些 PUT 请求失败了,在 close() 调用中发现并报告了此失败。我不知道 EMR s3:// 客户端是否使用该块大小作为其分区的大小标记;我个人建议使用较小的大小,例如 128MB。
无论如何:假定暂时性网络问题,或者您分配的 EC2 VM 网络连接不良。要求一个新的虚拟机。
AWS 支持工程师提到,在发生错误时,存储桶有很多命中。该作业正在重试默认次数 (5),并且很可能所有重试都被限制了。现在,我在提交作业时添加了以下配置参数,增加了重试次数。
spark.hadoop.fs.s3.maxRetries=20
此外,我压缩了输出,以便减少对 S3 的请求数量。这些更改后,我还没有看到几次运行失败。