error: path does not exist in spark submit with hadoop
error: path does not exist in spark submit with hadoop
我们正在使用命令 /home/ubuntu/spark/bin/spark-submit --master yarn --deploy-mode cluster --class "SimpleApp" /home/ubuntu/spark/examples/src/main/scala/sbt/target/scala-2.11/teste_2.11-1.0.jar
来 运行 下面的脚本
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark
import org.apache.spark.sql
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("query1").master("yarn").getOrCreate
val header = StructType(Array(
StructField("medallion", StringType, true),
StructField("hack_license", StringType, true),
StructField("vendor_id", StringType, true),
StructField("rate_code", IntegerType, true),
StructField("store_and_fwd_flag", StringType, true),
StructField("pickup_datetime", TimestampType, true),
StructField("dropoff_datetime", TimestampType, true),
StructField("passenger_count", IntegerType, true),
StructField("trip_time_in_secs", IntegerType, true),
StructField("trip_distance", FloatType, true),
StructField("pickup_longitude", FloatType, true),
StructField("pickup_latitude", FloatType, true),
StructField("dropoff_longitude", FloatType, true),
StructField("dropoff_latitude", FloatType, true),
StructField("payment_type", StringType, true),
StructField("fare_amount", FloatType, true),
StructField("surcharge", FloatType, true),
StructField("mta_tax", FloatType, true),
StructField("trip_amount", FloatType, true),
StructField("tolls_amount", FloatType, true),
StructField("total_amount", FloatType, true),
StructField("zone", StringType, true)))
val nyct = spark.read.format("csv").option("delimiter", ",").option("header", "true").schema(header).load("/home/ubuntu/trip_data/trip_data_fare_1.csv")
nyct.createOrReplaceTempView("nyct_temp_table")
spark.time(spark.sql("""SELECT zone, COUNT(*) AS accesses FROM nyct_temp_table WHERE (HOUR(dropoff_datetime) >= 8 AND HOUR(dropoff_datetime) <= 19) GROUP BY zone ORDER BY accesses DESC""").show())
}
}
想法是运行将脚本中的查询与spark和Hadoop组成一个集群。但在执行结束时,这会生成一个错误,从路径 /home/ubuntu/trip_data/trip_data_fare_1.csv
中获取 csv 文件。 This is the picture of the error
我认为问题是从节点无法在主目录中找到文件。有人知道我该如何解决这个问题以及 运行 集群中的这个脚本?
因为你在集群中 运行,你应该在 hdfs 中有这个文件。您可以使用以下命令将文件从本地文件系统复制到 HDFS:
hadoop fs -put source_path dest_path
然后在您的代码中使用 dest_path。
对于您,在具有本地文件的主机上执行此操作:
hadoop fs -put /home/ubuntu/trip_data/trip_data_fare_1.csv <some_hdfs_location>
通过执行以下操作验证副本是否有效:
hdfs dfs -ls <some_hdfs_location>
如果我没记错的话,那么 Spark 正在将您的本地文件系统视为其默认文件系统,这就是您面临此问题的原因 error.The 应该将配置传递到 Spark 上下文中,并且您应该提及 HADOOP_CONF_DIR
in spark-env.sh
file in all the nodes.Make sure the HADOOP_CONF_DIR
is specified in all the nodes
val spCont = <Spark Context>
val config = spCont.hadoopConfiguration
config.addResource(new Path(s"${HADOOP_HOME}<path to core-site.xml>"))
我们正在使用命令 /home/ubuntu/spark/bin/spark-submit --master yarn --deploy-mode cluster --class "SimpleApp" /home/ubuntu/spark/examples/src/main/scala/sbt/target/scala-2.11/teste_2.11-1.0.jar
来 运行 下面的脚本
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import org.apache.spark._
import org.apache.spark
import org.apache.spark.sql
import org.apache.spark.SparkContext._
object SimpleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("query1").master("yarn").getOrCreate
val header = StructType(Array(
StructField("medallion", StringType, true),
StructField("hack_license", StringType, true),
StructField("vendor_id", StringType, true),
StructField("rate_code", IntegerType, true),
StructField("store_and_fwd_flag", StringType, true),
StructField("pickup_datetime", TimestampType, true),
StructField("dropoff_datetime", TimestampType, true),
StructField("passenger_count", IntegerType, true),
StructField("trip_time_in_secs", IntegerType, true),
StructField("trip_distance", FloatType, true),
StructField("pickup_longitude", FloatType, true),
StructField("pickup_latitude", FloatType, true),
StructField("dropoff_longitude", FloatType, true),
StructField("dropoff_latitude", FloatType, true),
StructField("payment_type", StringType, true),
StructField("fare_amount", FloatType, true),
StructField("surcharge", FloatType, true),
StructField("mta_tax", FloatType, true),
StructField("trip_amount", FloatType, true),
StructField("tolls_amount", FloatType, true),
StructField("total_amount", FloatType, true),
StructField("zone", StringType, true)))
val nyct = spark.read.format("csv").option("delimiter", ",").option("header", "true").schema(header).load("/home/ubuntu/trip_data/trip_data_fare_1.csv")
nyct.createOrReplaceTempView("nyct_temp_table")
spark.time(spark.sql("""SELECT zone, COUNT(*) AS accesses FROM nyct_temp_table WHERE (HOUR(dropoff_datetime) >= 8 AND HOUR(dropoff_datetime) <= 19) GROUP BY zone ORDER BY accesses DESC""").show())
}
}
想法是运行将脚本中的查询与spark和Hadoop组成一个集群。但在执行结束时,这会生成一个错误,从路径 /home/ubuntu/trip_data/trip_data_fare_1.csv
中获取 csv 文件。 This is the picture of the error
我认为问题是从节点无法在主目录中找到文件。有人知道我该如何解决这个问题以及 运行 集群中的这个脚本?
因为你在集群中 运行,你应该在 hdfs 中有这个文件。您可以使用以下命令将文件从本地文件系统复制到 HDFS:
hadoop fs -put source_path dest_path
然后在您的代码中使用 dest_path。
对于您,在具有本地文件的主机上执行此操作:
hadoop fs -put /home/ubuntu/trip_data/trip_data_fare_1.csv <some_hdfs_location>
通过执行以下操作验证副本是否有效:
hdfs dfs -ls <some_hdfs_location>
如果我没记错的话,那么 Spark 正在将您的本地文件系统视为其默认文件系统,这就是您面临此问题的原因 error.The 应该将配置传递到 Spark 上下文中,并且您应该提及 HADOOP_CONF_DIR
in spark-env.sh
file in all the nodes.Make sure the HADOOP_CONF_DIR
is specified in all the nodes
val spCont = <Spark Context>
val config = spCont.hadoopConfiguration
config.addResource(new Path(s"${HADOOP_HOME}<path to core-site.xml>"))