如何使用数组类型列从 CSV 加载数据到 spark 数据帧
How to load data, with array type column, from CSV to spark dataframes
我有如图所示的 CSV 文件:
name,age,languages,experience
'Alice',31,['C++', 'Java'],2
'Bob',34,['Java', 'Python'],2
'Smith',35,['Ruby', 'Java'],3
'David',36,['C', 'Java', 'R']4
加载数据时,默认情况下所有列都作为字符串加载。
scala> val df = spark.read.format("csv").option("header",true).load("data.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: string ... 2 more fields]
scala> df.show()
+-------+---+------------------+----------+
| name|age| languages|experience|
+-------+---+------------------+----------+
|'Alice'| 31| ['C++', 'Java']| 2|
| 'Bob'| 34|['Java', 'Python']| 2|
|'Smith'| 35| ['Ruby', 'Java']| 3|
|'David'| 36|['C', 'Java', 'R']| 4|
+-------+---+------------------+----------+
scala> df.printSchema()
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- languages: string (nullable = true)
|-- experience: string (nullable = true)
所以我将自定义架构定义为 String
、Integer
、Array
、Integer
数据类型:
import org.apache.spark.sql.types.{StructField, StructType, StringType, ArrayType, IntegerType}
val custom_schema = new StructType(Array(StructField("name", StringType), StructField("age", IntegerType), StructField("languages", ArrayType(StringType)), StructField("experience", IntegerType)))
当我使用自定义架构加载数据时,出现错误
Terminal screenshot after running the command
scala> val df = spark.read.format("csv").option("header",true).schema(custom_schema).load("data.csv")
org.apache.spark.sql.AnalysisException: CSV data source does not support array<string> data type.
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema(DataSourceUtils.scala:67)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema$adapted(DataSourceUtils.scala:65)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:65)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:445)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
... 47 elided
如何通过将列作为数组来将数据加载到 spark 数据帧?
您可以在从文件中读取它后将其转换为数组,方法是使用 regexp_replace
删除方括号 ([
,]
) 并用逗号 (,
) 使用 split
例如..
val df = spark.read.format("csv").option("header",true).load("data.csv")
val transformedDf = df.withColumn("languages",
split(
regexp_replace(col("languages"),"\[|\]",""),
","
)
)
我有如图所示的 CSV 文件:
name,age,languages,experience
'Alice',31,['C++', 'Java'],2
'Bob',34,['Java', 'Python'],2
'Smith',35,['Ruby', 'Java'],3
'David',36,['C', 'Java', 'R']4
加载数据时,默认情况下所有列都作为字符串加载。
scala> val df = spark.read.format("csv").option("header",true).load("data.csv")
df: org.apache.spark.sql.DataFrame = [name: string, age: string ... 2 more fields]
scala> df.show()
+-------+---+------------------+----------+
| name|age| languages|experience|
+-------+---+------------------+----------+
|'Alice'| 31| ['C++', 'Java']| 2|
| 'Bob'| 34|['Java', 'Python']| 2|
|'Smith'| 35| ['Ruby', 'Java']| 3|
|'David'| 36|['C', 'Java', 'R']| 4|
+-------+---+------------------+----------+
scala> df.printSchema()
root
|-- name: string (nullable = true)
|-- age: string (nullable = true)
|-- languages: string (nullable = true)
|-- experience: string (nullable = true)
所以我将自定义架构定义为 String
、Integer
、Array
、Integer
数据类型:
import org.apache.spark.sql.types.{StructField, StructType, StringType, ArrayType, IntegerType}
val custom_schema = new StructType(Array(StructField("name", StringType), StructField("age", IntegerType), StructField("languages", ArrayType(StringType)), StructField("experience", IntegerType)))
当我使用自定义架构加载数据时,出现错误
Terminal screenshot after running the command
scala> val df = spark.read.format("csv").option("header",true).schema(custom_schema).load("data.csv")
org.apache.spark.sql.AnalysisException: CSV data source does not support array<string> data type.
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema(DataSourceUtils.scala:67)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.$anonfun$verifySchema$adapted(DataSourceUtils.scala:65)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:102)
at org.apache.spark.sql.execution.datasources.DataSourceUtils$.verifySchema(DataSourceUtils.scala:65)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:445)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:326)
at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:308)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:308)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:240)
... 47 elided
如何通过将列作为数组来将数据加载到 spark 数据帧?
您可以在从文件中读取它后将其转换为数组,方法是使用 regexp_replace
删除方括号 ([
,]
) 并用逗号 (,
) 使用 split
例如..
val df = spark.read.format("csv").option("header",true).load("data.csv")
val transformedDf = df.withColumn("languages",
split(
regexp_replace(col("languages"),"\[|\]",""),
","
)
)