HDP Sandbox IOException 在 DataFrame 上调用 saveAsTable
HDP Sandbox IOException calling saveAsTable on DataFrame
我正在尝试 运行 下面的示例,该示例试图从 Spark DataFrame 创建 Hive table。当我使用 master=local 调用 spark-submit 时代码有效,但当我使用 master=yarn 调用它时它会抛出异常。
这是调用:
spark-submit --class test.sandbox.HDPRiskFactor --master yarn --name "Risk Factor" ./hdprisk-0.0.1-SNAPSHOT.jar
此外,我从名为 "default.geolocation" 的 Hive 控制台创建了一个 table,但是当我调用 show() 时我无法从 spark 中看到它。我试图将 Yarn 模式下的执行器计数设置为 0,但也不起作用。
1) 为什么代码在 master local 而在 yarn 上工作
2) 为什么我无法从我的 spark 代码中看到在 hive 中创建的 table。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()
// val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
val hadoopconf = new Configuration()
val hdfs = FileSystem.get(hadoopconf)
val csvDataDir = "/tmp/data"
//import spark.implicits._
val dataList = List(("geolocation", "csv"), ("trucks", "csv"))
listFiles(this.getClass.getClassLoader.getResource(".").getFile)
dataList.map(path => {
val localFile = path._1 + "." + path._2
val hdfsFile = csvDataDir + "/" + path._1 + "." + path._2
if (!testDirExist(hdfs, hdfsFile)) copyStreamToHdfs(hdfs, "/root/", csvDataDir, localFile)
})
val geoLocationDF = spark.read.format("csv").option("header", "true").load("hdfs:///tmp/data/geolocation.csv")
// Now that we have the data loaded into a DataFrame, we can register a temporary view.
spark.sql("SHOW TABLES").show()
geoLocationDF.write.format("orc").saveAsTable("default.geolocation")
// geoLocationDF.createOrReplaceTempView("geolocation")
spark.sql("select * from default.geolocation").show()
}
我没有正确配置配置单元上下文。所以它正在将文件写入根目录。解决方案是传递正确的配置参数:
val spark = SparkSession.builder()
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("spark.sql.sources.maxConcurrentWrites","1")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("parquet.compression", "SNAPPY")
.config("hive.exec.max.dynamic.partitions", "3000")
.config("parquet.enable.dictionary", "false")
.config("hive.support.concurrency", "true")
.enableHiveSupport()
.getOrCreate()
我正在尝试 运行 下面的示例,该示例试图从 Spark DataFrame 创建 Hive table。当我使用 master=local 调用 spark-submit 时代码有效,但当我使用 master=yarn 调用它时它会抛出异常。 这是调用: spark-submit --class test.sandbox.HDPRiskFactor --master yarn --name "Risk Factor" ./hdprisk-0.0.1-SNAPSHOT.jar 此外,我从名为 "default.geolocation" 的 Hive 控制台创建了一个 table,但是当我调用 show() 时我无法从 spark 中看到它。我试图将 Yarn 模式下的执行器计数设置为 0,但也不起作用。 1) 为什么代码在 master local 而在 yarn 上工作 2) 为什么我无法从我的 spark 代码中看到在 hive 中创建的 table。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().getOrCreate()
// val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
val hadoopconf = new Configuration()
val hdfs = FileSystem.get(hadoopconf)
val csvDataDir = "/tmp/data"
//import spark.implicits._
val dataList = List(("geolocation", "csv"), ("trucks", "csv"))
listFiles(this.getClass.getClassLoader.getResource(".").getFile)
dataList.map(path => {
val localFile = path._1 + "." + path._2
val hdfsFile = csvDataDir + "/" + path._1 + "." + path._2
if (!testDirExist(hdfs, hdfsFile)) copyStreamToHdfs(hdfs, "/root/", csvDataDir, localFile)
})
val geoLocationDF = spark.read.format("csv").option("header", "true").load("hdfs:///tmp/data/geolocation.csv")
// Now that we have the data loaded into a DataFrame, we can register a temporary view.
spark.sql("SHOW TABLES").show()
geoLocationDF.write.format("orc").saveAsTable("default.geolocation")
// geoLocationDF.createOrReplaceTempView("geolocation")
spark.sql("select * from default.geolocation").show()
}
我没有正确配置配置单元上下文。所以它正在将文件写入根目录。解决方案是传递正确的配置参数:
val spark = SparkSession.builder()
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("spark.sql.sources.maxConcurrentWrites","1")
.config("spark.sql.parquet.compression.codec", "snappy")
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("parquet.compression", "SNAPPY")
.config("hive.exec.max.dynamic.partitions", "3000")
.config("parquet.enable.dictionary", "false")
.config("hive.support.concurrency", "true")
.enableHiveSupport()
.getOrCreate()