如何使用 spark-csv 包在 HDFS 上只读取 n 行大型 CSV 文件?
How to read only n rows of large CSV file on HDFS using spark-csv package?
我在 HDFS 上有一个很大的分布式文件,每次我使用带有 spark-csv 包的 sqlContext 时,它首先加载整个文件,这需要相当长的时间。
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
现在因为我有时只想快速检查一下,所以我只需要整个文件的几行/任意 n 行。
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
但是所有这些 运行 在文件加载完成后。我不能在读取文件本身时限制行数吗?我指的是 n_rows 相当于 spark-csv 中的 pandas,例如:
pd_df = pandas.read_csv("file_path", nrows=20)
也可能是spark没有真正加载文件,第一步,但在这种情况下,为什么我的文件加载步骤花了太多时间呢?
我要
df.count()
只给我 n
而不是所有行,可以吗?
您可以使用 limit(n)
。
sqlContext.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true').load("file_path").limit(20)
这只会加载 20 行。
我的理解是 spark-csv 模块不支持直接读取几行,作为 解决方法 您可以将文件作为文本文件读取,取任意多行,并将其保存到某个临时位置。保存行后,您可以使用 spark-csv 读取行,包括 inferSchema
选项(如果您处于探索模式,您可能想使用)。
val numberOfLines = ...
spark.
read.
text("myfile.csv").
limit(numberOfLines).
write.
text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
read.
option("inferSchema", true). // <-- you are in exploration mode, aren't you?
csv(s"myfile-$numberOfLines.csv")
不推断架构并使用 limit(n)
在各个方面对我都有效。
f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)
注意:如果我们使用inferschema='true'
,它又是同一时间,因此可能是同一个旧东西。
但是如果我们不知道架构,Jacek Laskowski 解决方案也很有效。 :)
从 PySpark 2.3 开始,您可以简单地将数据作为文本加载、限制并在结果上应用 csv reader:
(spark
.read
.options(inferSchema="true", header="true")
.csv(
spark.read.text("/path/to/file")
.limit(20) # Apply limit
.rdd.flatMap(lambda x: x))) # Convert to RDD[str]
Spark 2.2 后可用的 Scala 对应物:
spark
.read
.options(Map("inferSchema" -> "true", "header" -> "true"))
.csv(spark.read.text("/path/to/file").limit(20).as[String])
在 Spark 3.0.0 或更高版本中,也可以应用限制并使用 from_csv
函数,但它需要一个架构,因此它可能不符合您的要求。
Jacek Laskowski 给出的解决方案效果很好。下面展示一个内存中的变体。
我最近 运行 遇到了这个问题。我正在使用数据块并且有一个巨大的 csv 目录(200 个文件,每个文件 200MB)
我原来有
val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")
display(df)
这花了很多时间(10 多分钟),但后来我将其更改为下面,它立即 运行(2 秒)
val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)
val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))
display(df)
推断文本格式的模式很困难,可以通过这种方式为 csv 和 json(但如果是多行 json)格式完成。
由于我没有在答案中看到该解决方案,纯 SQL-approach 对我有用:
df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")
如果没有 header 列将被命名为 _c0、_c1 等。不需要架构。
这可能会对在 java 工作的人有所帮助。
应用限制无助于减少时间。您必须从文件中收集 n 行。
DataFrameReader frameReader = spark
.read()
.format("csv")
.option("inferSchema", "true");
//set framereader options, delimiters etc
List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));
我在 HDFS 上有一个很大的分布式文件,每次我使用带有 spark-csv 包的 sqlContext 时,它首先加载整个文件,这需要相当长的时间。
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")
现在因为我有时只想快速检查一下,所以我只需要整个文件的几行/任意 n 行。
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)
但是所有这些 运行 在文件加载完成后。我不能在读取文件本身时限制行数吗?我指的是 n_rows 相当于 spark-csv 中的 pandas,例如:
pd_df = pandas.read_csv("file_path", nrows=20)
也可能是spark没有真正加载文件,第一步,但在这种情况下,为什么我的文件加载步骤花了太多时间呢?
我要
df.count()
只给我 n
而不是所有行,可以吗?
您可以使用 limit(n)
。
sqlContext.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true').load("file_path").limit(20)
这只会加载 20 行。
我的理解是 spark-csv 模块不支持直接读取几行,作为 解决方法 您可以将文件作为文本文件读取,取任意多行,并将其保存到某个临时位置。保存行后,您可以使用 spark-csv 读取行,包括 inferSchema
选项(如果您处于探索模式,您可能想使用)。
val numberOfLines = ...
spark.
read.
text("myfile.csv").
limit(numberOfLines).
write.
text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
read.
option("inferSchema", true). // <-- you are in exploration mode, aren't you?
csv(s"myfile-$numberOfLines.csv")
不推断架构并使用 limit(n)
在各个方面对我都有效。
f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)
注意:如果我们使用inferschema='true'
,它又是同一时间,因此可能是同一个旧东西。
但是如果我们不知道架构,Jacek Laskowski 解决方案也很有效。 :)
从 PySpark 2.3 开始,您可以简单地将数据作为文本加载、限制并在结果上应用 csv reader:
(spark
.read
.options(inferSchema="true", header="true")
.csv(
spark.read.text("/path/to/file")
.limit(20) # Apply limit
.rdd.flatMap(lambda x: x))) # Convert to RDD[str]
Spark 2.2 后可用的 Scala 对应物:
spark
.read
.options(Map("inferSchema" -> "true", "header" -> "true"))
.csv(spark.read.text("/path/to/file").limit(20).as[String])
在 Spark 3.0.0 或更高版本中,也可以应用限制并使用 from_csv
函数,但它需要一个架构,因此它可能不符合您的要求。
Jacek Laskowski 给出的解决方案效果很好。下面展示一个内存中的变体。
我最近 运行 遇到了这个问题。我正在使用数据块并且有一个巨大的 csv 目录(200 个文件,每个文件 200MB)
我原来有
val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")
display(df)
这花了很多时间(10 多分钟),但后来我将其更改为下面,它立即 运行(2 秒)
val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)
val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))
display(df)
推断文本格式的模式很困难,可以通过这种方式为 csv 和 json(但如果是多行 json)格式完成。
由于我没有在答案中看到该解决方案,纯 SQL-approach 对我有用:
df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")
如果没有 header 列将被命名为 _c0、_c1 等。不需要架构。
这可能会对在 java 工作的人有所帮助。 应用限制无助于减少时间。您必须从文件中收集 n 行。
DataFrameReader frameReader = spark
.read()
.format("csv")
.option("inferSchema", "true");
//set framereader options, delimiters etc
List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));