如何使用 spark-csv 包在 HDFS 上只读取 n 行大型 CSV 文件?

How to read only n rows of large CSV file on HDFS using spark-csv package?

我在 HDFS 上有一个很大的分布式文件,每次我使用带有 spark-csv 包的 sqlContext 时,它首先加载整个文件,这需要相当长的时间。

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path")

现在因为我有时只想快速检查一下,所以我只需要整个文件的几行/任意 n 行。

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").take(n)
df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("file_path").head(n)

但是所有这些 运行 在文件加载完成后。我不能在读取文件本身时限制行数吗?我指的是 n_rows 相当于 spark-csv 中的 pandas,例如:

pd_df = pandas.read_csv("file_path", nrows=20)

也可能是spark没有真正加载文件,第一步,但在这种情况下,为什么我的文件加载步骤花了太多时间呢?

我要

df.count()

只给我 n 而不是所有行,可以吗?

您可以使用 limit(n)

sqlContext.format('com.databricks.spark.csv') \
          .options(header='true', inferschema='true').load("file_path").limit(20)

这只会加载 20 行。

我的理解是 spark-csv 模块不支持直接读取几行,作为 解决方法 您可以将文件作为文本文件读取,取任意多行,并将其保存到某个临时位置。保存行后,您可以使用 spark-csv 读取行,包括 inferSchema 选项(如果您处于探索模式,您可能想使用)。

val numberOfLines = ...
spark.
  read.
  text("myfile.csv").
  limit(numberOfLines).
  write.
  text(s"myfile-$numberOfLines.csv")
val justFewLines = spark.
  read.
  option("inferSchema", true). // <-- you are in exploration mode, aren't you?
  csv(s"myfile-$numberOfLines.csv")

不推断架构并使用 limit(n) 在各个方面对我都有效。

f_schema = StructType([
StructField("col1",LongType(),True),
StructField("col2",IntegerType(),True),
StructField("col3",DoubleType(),True)
...
])

df_n = sqlContext.read.format('com.databricks.spark.csv').options(header='true').schema(f_schema).load(data_path).limit(10)

注意:如果我们使用inferschema='true',它又是同一时间,因此可能是同一个旧东西。

但是如果我们不知道架构,Jacek Laskowski 解决方案也很有效。 :)

从 PySpark 2.3 开始,您可以简单地将数据作为文本加载、限制并在结果上应用 csv reader:

(spark
  .read
  .options(inferSchema="true", header="true")
  .csv(
      spark.read.text("/path/to/file")
          .limit(20)                   # Apply limit
          .rdd.flatMap(lambda x: x)))  # Convert to RDD[str]

Spark 2.2 后可用的 Scala 对应物:

spark
  .read
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .csv(spark.read.text("/path/to/file").limit(20).as[String])

在 Spark 3.0.0 或更高版本中,也可以应用限制并使用 from_csv 函数,但它需要一个架构,因此它可能不符合您的要求。

Jacek Laskowski 给出的解决方案效果很好。下面展示一个内存中的变体。

我最近 运行 遇到了这个问题。我正在使用数据块并且有一个巨大的 csv 目录(200 个文件,每个文件 200MB)

我原来有

val df = spark.read.format("csv")
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.load("dbfs:/huge/csv/files/in/this/directory/")

display(df)

这花了很多时间(10 多分钟),但后来我将其更改为下面,它立即 运行(2 秒)

val lines = spark.read.text("dbfs:/huge/csv/files/in/this/directory/").as[String].take(1000)

val df = spark.read
.option("header", true)
.option("sep", ",")
.option("inferSchema", true)
.csv(spark.createDataset(lines))

display(df)

推断文本格式的模式很困难,可以通过这种方式为 csv 和 json(但如果是多行 json)格式完成。

由于我没有在答案中看到该解决方案,纯 SQL-approach 对我有用:

df = spark.sql("SELECT * FROM csv.`/path/to/file` LIMIT 10000")

如果没有 header 列将被命名为 _c0、_c1 等。不需要架构。

这可能会对在 java 工作的人有所帮助。 应用限制无助于减少时间。您必须从文件中收集 n 行。

        DataFrameReader frameReader = spark
          .read()
          .format("csv")
          .option("inferSchema", "true");
    //set framereader options, delimiters etc

    List<String> dataset = spark.read().textFile(filePath).limit(MAX_FILE_READ_SIZE).collectAsList();
    return frameReader.csv(spark.createDataset(dataset, Encoders.STRING()));