写入 hdfs 路径时出现错误 java.io.IOException:重命名失败
While writing to hdfs path getting error java.io.IOException: Failed to rename
我正在使用 spark-sql-2.4.1v,它使用的是 hadoop-2.6.5.jar 版本。我需要先将数据保存在 hdfs 上,然后再转移到 cassandra。
因此,我试图将数据保存在 hdfs 上,如下所示:
String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(hdfsPath, resultDs );
});
public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
df.write()
.format("parquet")
.mode("append")
.save(parquet_file);
logger.info(" Saved parquet file : " + parquet_file + "successfully");
}
当我 运行 我在集群上的作业失败时抛出此错误:
java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
请建议如何解决此问题?
您可以在一个作业中完成所有选择,在一个 table 中获得所有选择和并集。
Dataset<Row> resultDs = givenItemList.parallelStream().map( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
return sparkSession.sql(query);
}).reduce((a, b) -> a.union(b)).get
saveDsToHdfs(hdfsPath, resultDs );
错误是您试图将数据帧写入给定 ItemList 集合中每个项目的相同位置。通常如果这样做它应该给出错误
OutputDirectory already exists
但是因为 foreach 函数会在并行线程中执行所有项目,所以你得到这个 error.You 可以像这样为每个线程提供单独的目录
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s_item",hdfsPath), resultDs );
});
否则你也可以在 hdfspath 下有这样的子目录
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s/item",hdfsPath), resultDs );
});
`
我正在使用 spark-sql-2.4.1v,它使用的是 hadoop-2.6.5.jar 版本。我需要先将数据保存在 hdfs 上,然后再转移到 cassandra。 因此,我试图将数据保存在 hdfs 上,如下所示:
String hdfsPath = "/user/order_items/";
cleanedDs.createTempViewOrTable("source_tab");
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(hdfsPath, resultDs );
});
public static void saveDsToHdfs(String parquet_file, Dataset<Row> df) {
df.write()
.format("parquet")
.mode("append")
.save(parquet_file);
logger.info(" Saved parquet file : " + parquet_file + "successfully");
}
当我 运行 我在集群上的作业失败时抛出此错误:
java.io.IOException: Failed to rename FileStatus{path=hdfs:/user/order_items/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.parquet; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to hdfs:/user/order_items/part-00007.parquet
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
请建议如何解决此问题?
您可以在一个作业中完成所有选择,在一个 table 中获得所有选择和并集。
Dataset<Row> resultDs = givenItemList.parallelStream().map( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
return sparkSession.sql(query);
}).reduce((a, b) -> a.union(b)).get
saveDsToHdfs(hdfsPath, resultDs );
错误是您试图将数据帧写入给定 ItemList 集合中每个项目的相同位置。通常如果这样做它应该给出错误
OutputDirectory already exists
但是因为 foreach 函数会在并行线程中执行所有项目,所以你得到这个 error.You 可以像这样为每个线程提供单独的目录
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s_item",hdfsPath), resultDs );
});
否则你也可以在 hdfspath 下有这样的子目录
givenItemList.parallelStream().forEach( item -> {
String query = "select $item as itemCol , avg($item) as mean groupBy year";
Dataset<Row> resultDs = sparkSession.sql(query);
saveDsToHdfs(Strin.format("%s/item",hdfsPath), resultDs );
}); `