如何更改数组中的 Spark Dataframe 列数据类型

How to change Spark Dataframe column data type in an array

关于我的一个更大的问题,我有两个小问题:我想每天读入一次 JSON 数据并将其保存为 Parquet 以供以后与数据相关的工作使用。使用镶木地板要快得多。但我坚持的事实是,在读取该镶木地板时,Spark 总是尝试从模式文件中获取模式,或者只是从第一个镶木地板文件中获取模式,并假定所有文件的模式都相同。但有些情况下,我们在某些列中有几天没有任何数据。

假设我有一个 JSON 文件,其中的数据具有以下架构:

root
 |-- Id: long (nullable = true)    
 |-- People: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Amount: double (nullable = true)

然后我有另一个 JSON 文件,其中 "People" 列没有数据。因此架构如下:

root
 |-- Id: long (nullable = true)    
 |-- People: array (nullable = true)
 |    |-- element: string (containsNull = true)

当我将它们与 read.json 一起阅读时,Spark 遍历所有文件并从这些文件中推断出合并的模式,更具体地说,是从第一个文件中推断出合并的模式,只是将第二个文件中的行留空,但架构是正确的。

但是当我分别读取它们并分别写入 parquet 时,我无法一起读取它们,因为对于 Parquet,模式不匹配,我得到一个错误。

我的第一个想法是读入缺少数据的文件并通过强制转换列类型以匹配第一个模式来手动更改其模式,但是这种手动转换是错误的,它可能不同步而且我没有这样做甚至知道如何将此字符串类型转换为数组或结构类型。

另一个问题是,当 "Amount" 字段只有完整的整数时,Spark 会根据需要将它们读入 long 而不是 double。但是如果我使用:

val df2 = df.withColumn("People.Amount", col("People.Amount").cast(org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType,true)))

然后它不改变原始列的类型,而是添加一个名为 People.Amount

的新列

我认为您可以通过模式合并来解决一些问题(请参阅文档 here)。如果您拥有的第一个镶木地板具有正确的架构,那么您可以做类似这样的事情来将该架构应用于新的镶木地板吗?

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

编辑

你说有200多个栏目,你都知道吗?我看到了两条前进的道路,可能有多种方法可以实现这一目标。一种是您预先定义可以看到的所有字段。我过去所做的是创建一个 json 文件,其中包含一个虚拟记录,其中包含我想要的所有字段,并且完全按照我想要的方式输入。然后,您始终可以在加载 "Monday" 或 "Tuesday" 数据集的同时加载该记录,并将其删除 post 加载。这可能不是最佳做法,但这就是我跌跌撞撞前进的方式。

另一种方法是停止尝试 load/save 正确模式中的单个数据集,并在加载所有数据后设置模式。听起来不像是您想走的路,但至少您不会遇到这个特定问题。