Spark - 将 CSV 文件加载为 DataFrame?
Spark - load CSV file as DataFrame?
我想在 spark 中读取 CSV 并将其转换为 DataFrame 并使用 df.registerTempTable("table_name")
将其存储在 HDFS 中
我试过:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
我得到的错误:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在 Apache Spark 中将 CSV 文件加载为 DataFrame 的正确命令是什么?
spark-csv 是核心 Spark 功能的一部分,不需要单独的库。
所以你可以做例如
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
在 scala 中,(这适用于任何格式的定界符提到“,”对于 csv,“\t”对于 tsv 等)
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("delimiter", ",")
.load("csvfile.csv")
使用 Spark 2.x
解析 CSV 并加载为 DataFrame/DataSet
首先,初始化 SparkSession
对象 默认情况下它将在 shell 中可用为 spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;
Use any one of the following ways to load CSV as DataFrame/DataSet
1。以编程方式进行
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
更新:添加所有选项 from here 以防 link 将来被破坏
- 路径:文件位置。类似于 Spark 可以接受标准的 Hadoop globbing 表达式。
- header:当设置为true时,文件的第一行将用于命名列,不会包含在数据中。所有类型都将假定为字符串。默认值为 false。
- delimiter:默认使用列分隔,但delimiter可以设置为任意字符
- 引号:引号字符默认为",但可以设置为任意字符。引号内的分隔符将被忽略
- escape:转义字符默认为 ,但可以设置为任意字符。转义引号字符被忽略
- parserLib:默认为“commons”,可设置为“univocity" 使用该库进行 CSV 解析。
- mode:决定解析模式。默认情况下它是 PERMISSIVE。可能的值是:
- PERMISSIVE:尝试解析所有行:为缺少的标记插入空值,忽略多余的标记。
- DROPMALFORMED:删除比预期更少或更多标记的行或与模式不匹配的标记
- FAILFAST:如果遇到任何格式错误的行,则中止并抛出 RuntimeException
字符集:默认为 'UTF-8' 但可以设置为其他有效的字符集名称
- inferSchema:自动推断列类型。它需要额外传递一次数据,默认情况下为 false
注释:跳过以该字符开头的行。默认为“#”。通过将其设置为 null 来禁用评论。
- nullValue:指定一个表示空值的字符串,任何匹配这个字符串的字段在DataFrame
中都会被设置为空值
- dateFormat:指定一个字符串,指示读取日期或时间戳时要使用的日期格式。自定义日期格式遵循 java.text.SimpleDateFormat 中的格式。这适用于 DateType 和 TimestampType。默认情况下,它为 null,这意味着尝试通过 java.sql.Timestamp.valueOf() 和 java.sql.Date.valueOf().
解析时间和日期
2。
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
依赖关系:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Spark 版本 < 2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
依赖关系:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
使用 Spark 2.0,以下是读取 CSV 的方法
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
在 Java 1.8 中,此代码片段非常适合读取 CSV 文件
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Penny 的 Spark 2 示例是在 spark2 中执行此操作的方法。还有一个技巧:通过对数据进行初始扫描,将选项 inferSchema
设置为 true
,从而为您生成 header
那么这里假设spark
是你设置的sparksession,就是加载amazon在S3上托管的所有Landsat图片的CSV索引文件的操作。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
坏消息是:这会触发文件扫描;对于像这个 20+MB 压缩 CSV 文件这样的大型文件,通过长途连接可能需要 30 秒。记住这一点:你最好在输入模式后手动编码它。
(代码片段 Apache 软件许可证 2.0 已获得许可以避免所有歧义;我在 demo/integration S3 集成测试中所做的事情)
Hadoop 是 2.6,Spark 是 1.6,没有 "databricks" 包。
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
默认文件格式是 spark.read.. 的 Parquet,文件读取 csv,这就是为什么会出现异常。指定 csv 格式 api 您正尝试使用
解析 CSV 文件有很多挑战,如果文件较大,它会不断累加,如果列值中有 non-english/escape/separator/other 个字符,则可能导致解析错误。
神奇之处在于所使用的选项。对我有用并希望能涵盖大部分边缘情况的代码在下面的代码中:
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
希望对您有所帮助。更多请参考:Using PySpark 2 to read CSV having HTML source code
注意:上面的代码来自 Spark 2 API,其中读取 API 的 CSV 文件与内置的 Spark 可安装包捆绑在一起。
注意:PySpark 是 Spark 的 Python 包装器,与 Scala/Java 共享相同的 API。
如果您使用 scala 2.11 和 Apache 2.0 或更高版本构建 jar。
无需创建 sqlContext
或 sparkContext
对象。只需一个 SparkSession
对象即可满足所有需求。
以下是我的代码,运行良好:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
如果您在群集中 运行,只需在定义 sparkBuilder
对象
时将 .master("local")
更改为 .master("yarn")
Spark 文档介绍了这一点:
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
如果使用 spark 2.0+
试试这个
For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
注意:- 这适用于任何带分隔符的文件。只需使用 option(“delimiter”,) 来更改值。
希望对您有所帮助。
使用 in-built Spark csv,您可以使用新的 SparkSession object 轻松完成它,适用于 Spark > 2.0。
val df = spark.
read.
option("inferSchema", "false").
option("header","true").
option("mode","DROPMALFORMED").
option("delimiter", ";").
schema(dataSchema).
csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()
您可以设置各种选项。
header
:您的文件是否在顶部包含 header 行
inferSchema
:是否要自动推断模式。默认值为 true
。我总是喜欢提供模式以确保正确的数据类型。
mode
: 解析模式, PERMISSIVE, DROPMALFORMED or FAILFAST
delimiter
:指定分隔符,默认为逗号(',')
要从系统上的相对路径读取,请使用System.getProperty方法获取当前目录,并进一步使用相对路径加载文件。
scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)
spark:2.4.4 scala:2.11.12
将以下 Spark 依赖项添加到 POM 文件:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
Spark 配置:
val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()
读取 csv 文件:
val df = spark.read.option("header", "true").csv("FILE_PATH")
显示输出:
df.show()
使用 Spark 2.4+,如果您想从本地目录加载 csv,则可以使用 2 个会话并将其加载到配置单元中。第一个会话应使用 master() 配置创建为 "local[*]",第二个会话应使用 "yarn" 并启用 Hive。
下面的对我有用。
import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._
object testCSV {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()
import spark_local.implicits._
spark_local.sql("SET").show(100,false)
val local_path="/tmp/data/spend_diversity.csv" // Local file
val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
df_local.show(false)
val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()
import spark.implicits._
spark.sql("SET").show(100,false)
val df = df_local
df.createOrReplaceTempView("lcsv")
spark.sql(" drop table if exists work.local_csv ")
spark.sql(" create table work.local_csv as select * from lcsv ")
}
当 运行 与 spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
时,一切正常并在配置单元中创建了 table。
我想在 spark 中读取 CSV 并将其转换为 DataFrame 并使用 df.registerTempTable("table_name")
我试过:
scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")
我得到的错误:
java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
在 Apache Spark 中将 CSV 文件加载为 DataFrame 的正确命令是什么?
spark-csv 是核心 Spark 功能的一部分,不需要单独的库。 所以你可以做例如
df = spark.read.format("csv").option("header", "true").load("csvfile.csv")
在 scala 中,(这适用于任何格式的定界符提到“,”对于 csv,“\t”对于 tsv 等)
val df = sqlContext.read.format("com.databricks.spark.csv")
.option("delimiter", ",")
.load("csvfile.csv")
使用 Spark 2.x
解析 CSV 并加载为 DataFrame/DataSet首先,初始化 SparkSession
对象 默认情况下它将在 shell 中可用为 spark
val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;
Use any one of the following ways to load CSV as
DataFrame/DataSet
1。以编程方式进行
val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")
更新:添加所有选项 from here 以防 link 将来被破坏
- 路径:文件位置。类似于 Spark 可以接受标准的 Hadoop globbing 表达式。
- header:当设置为true时,文件的第一行将用于命名列,不会包含在数据中。所有类型都将假定为字符串。默认值为 false。
- delimiter:默认使用列分隔,但delimiter可以设置为任意字符
- 引号:引号字符默认为",但可以设置为任意字符。引号内的分隔符将被忽略
- escape:转义字符默认为 ,但可以设置为任意字符。转义引号字符被忽略
- parserLib:默认为“commons”,可设置为“univocity" 使用该库进行 CSV 解析。
- mode:决定解析模式。默认情况下它是 PERMISSIVE。可能的值是:
- PERMISSIVE:尝试解析所有行:为缺少的标记插入空值,忽略多余的标记。
- DROPMALFORMED:删除比预期更少或更多标记的行或与模式不匹配的标记
- FAILFAST:如果遇到任何格式错误的行,则中止并抛出 RuntimeException 字符集:默认为 'UTF-8' 但可以设置为其他有效的字符集名称
- inferSchema:自动推断列类型。它需要额外传递一次数据,默认情况下为 false 注释:跳过以该字符开头的行。默认为“#”。通过将其设置为 null 来禁用评论。
- nullValue:指定一个表示空值的字符串,任何匹配这个字符串的字段在DataFrame 中都会被设置为空值
- dateFormat:指定一个字符串,指示读取日期或时间戳时要使用的日期格式。自定义日期格式遵循 java.text.SimpleDateFormat 中的格式。这适用于 DateType 和 TimestampType。默认情况下,它为 null,这意味着尝试通过 java.sql.Timestamp.valueOf() 和 java.sql.Date.valueOf(). 解析时间和日期
2。
val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")
依赖关系:
"org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,
Spark 版本 < 2.0
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");
依赖关系:
"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,
使用 Spark 2.0,以下是读取 CSV 的方法
val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()
val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)
在 Java 1.8 中,此代码片段非常适合读取 CSV 文件
POM.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>
Java
SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();
Penny 的 Spark 2 示例是在 spark2 中执行此操作的方法。还有一个技巧:通过对数据进行初始扫描,将选项 inferSchema
设置为 true
那么这里假设spark
是你设置的sparksession,就是加载amazon在S3上托管的所有Landsat图片的CSV索引文件的操作。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
坏消息是:这会触发文件扫描;对于像这个 20+MB 压缩 CSV 文件这样的大型文件,通过长途连接可能需要 30 秒。记住这一点:你最好在输入模式后手动编码它。
(代码片段 Apache 软件许可证 2.0 已获得许可以避免所有歧义;我在 demo/integration S3 集成测试中所做的事情)
Hadoop 是 2.6,Spark 是 1.6,没有 "databricks" 包。
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))
val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))
val df = sqlContext.createDataFrame(rdd, schema)
默认文件格式是 spark.read.. 的 Parquet,文件读取 csv,这就是为什么会出现异常。指定 csv 格式 api 您正尝试使用
解析 CSV 文件有很多挑战,如果文件较大,它会不断累加,如果列值中有 non-english/escape/separator/other 个字符,则可能导致解析错误。
神奇之处在于所使用的选项。对我有用并希望能涵盖大部分边缘情况的代码在下面的代码中:
### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)
希望对您有所帮助。更多请参考:Using PySpark 2 to read CSV having HTML source code
注意:上面的代码来自 Spark 2 API,其中读取 API 的 CSV 文件与内置的 Spark 可安装包捆绑在一起。
注意:PySpark 是 Spark 的 Python 包装器,与 Scala/Java 共享相同的 API。
如果您使用 scala 2.11 和 Apache 2.0 或更高版本构建 jar。
无需创建 sqlContext
或 sparkContext
对象。只需一个 SparkSession
对象即可满足所有需求。
以下是我的代码,运行良好:
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}
object driver {
def main(args: Array[String]) {
val log = LogManager.getRootLogger
log.info("**********JAR EXECUTION STARTED**********")
val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}
如果您在群集中 运行,只需在定义 sparkBuilder
对象
.master("local")
更改为 .master("yarn")
Spark 文档介绍了这一点: https://spark.apache.org/docs/2.2.0/sql-programming-guide.html
如果使用 spark 2.0+
试试这个For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")
For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")
For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")
注意:- 这适用于任何带分隔符的文件。只需使用 option(“delimiter”,) 来更改值。
希望对您有所帮助。
使用 in-built Spark csv,您可以使用新的 SparkSession object 轻松完成它,适用于 Spark > 2.0。
val df = spark.
read.
option("inferSchema", "false").
option("header","true").
option("mode","DROPMALFORMED").
option("delimiter", ";").
schema(dataSchema).
csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()
您可以设置各种选项。
header
:您的文件是否在顶部包含 header 行inferSchema
:是否要自动推断模式。默认值为true
。我总是喜欢提供模式以确保正确的数据类型。mode
: 解析模式, PERMISSIVE, DROPMALFORMED or FAILFASTdelimiter
:指定分隔符,默认为逗号(',')
要从系统上的相对路径读取,请使用System.getProperty方法获取当前目录,并进一步使用相对路径加载文件。
scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)
spark:2.4.4 scala:2.11.12
将以下 Spark 依赖项添加到 POM 文件:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
Spark 配置:
val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()
读取 csv 文件:
val df = spark.read.option("header", "true").csv("FILE_PATH")
显示输出:
df.show()
使用 Spark 2.4+,如果您想从本地目录加载 csv,则可以使用 2 个会话并将其加载到配置单元中。第一个会话应使用 master() 配置创建为 "local[*]",第二个会话应使用 "yarn" 并启用 Hive。
下面的对我有用。
import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._
object testCSV {
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()
import spark_local.implicits._
spark_local.sql("SET").show(100,false)
val local_path="/tmp/data/spend_diversity.csv" // Local file
val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
df_local.show(false)
val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()
import spark.implicits._
spark.sql("SET").show(100,false)
val df = df_local
df.createOrReplaceTempView("lcsv")
spark.sql(" drop table if exists work.local_csv ")
spark.sql(" create table work.local_csv as select * from lcsv ")
}
当 运行 与 spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar
时,一切正常并在配置单元中创建了 table。