在 PySpark 中将 ArrayType(StringType()) 的列转换为 ArrayType(DateType())
Convert Column of ArrayType(StringType()) to ArrayType(DateType()) in PySpark
我有一个像下面这样的数据框,我想将其转换为 ISO-8601:
| production_date | expiration_date |
--------------------------------------------------------------
|["20/05/1996","01/01/2018"] | ["15/01/1997","27/03/2019"] |
| .... .... |
--------------------------------------------------------------
我要:
| good_prod_date | good_exp_date |
-------------------------------------------------------------
|[1996-05-20,2018-01-01] | [1997-01-01,2019-03-27] |
| .... .... |
-------------------------------------------------------------
但是,有超过 20 列和数百万行。我试图避免使用 UDF,因为它们效率低下,而且在大多数情况下是一种糟糕的方法。我还避免分解每一列,因为那是:
- 效率低下(不必要地创建了数亿行)
- 不是一个优雅的解决方案
- 我试过了,还是不行
到目前为止我有以下内容:
def explodeCols(df):
return (df
.withColumn("production_date", sf.explode("production_date"))
.withColumn("expiration_date", sf.explode("expiration_date")))
def fixTypes(df):
return (df
.withColumn("production_date", sf.to_date("production_date", "dd/MM/yyyy"))
.withColumn("expiration_date", sf.to_date("expiration_date", "dd/MM/yyyy")))
def consolidate(df):
cols = ["production_date", "expiration_date"]
return df.groupBy("id").agg(*[sf.collect_list(c) for c in cols])
historyDF = (df
.transform(explodeCols)
.transform(fixTypes)
.transform(consolidate))
然而,当我 运行 DataBricks 上的这段代码时,作业永远不会执行,事实上,它会导致 failed/dead 个执行程序(这不好)。
我尝试的另一个解决方案如下:
df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))
但我得到的结果是一个空数组:
| production_date | good_prod_date |
-------------------------------------------------------------
|["20/05/1996","01/01/2018"] | [null,null] |
| .... .... |
-------------------------------------------------------------
使用 pyspark.sql.function.transform
高阶函数代替 explode
函数来转换数组中的每个值。
df
.withColumn("production_date",F.expr("transform(production_date,v -> to_date(v,'dd/MM/yyyy'))"))
.withColumn("expiration_date",F.expr("transform(expiration_date,v -> to_date(v,'dd/MM/yyyy'))"))
.show()
df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))
这将不起作用,因为 production_date
具有不同的日期格式,如果此列具有 yyyy-MM-dd
之类的日期格式,则转换将起作用。
df.select("actual_date").printSchema()
root
|-- actual_date: array (nullable = true)
| |-- element: string (containsNull = true)
df.select("actual_date").show(false)
+------------------------+
|actual_date |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+
df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).printSchema()
root
|-- actual_date: array (nullable = true)
| |-- element: date (containsNull = true)
df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).show()
+------------------------+
|actual_date |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+
我有一个像下面这样的数据框,我想将其转换为 ISO-8601:
| production_date | expiration_date |
--------------------------------------------------------------
|["20/05/1996","01/01/2018"] | ["15/01/1997","27/03/2019"] |
| .... .... |
--------------------------------------------------------------
我要:
| good_prod_date | good_exp_date |
-------------------------------------------------------------
|[1996-05-20,2018-01-01] | [1997-01-01,2019-03-27] |
| .... .... |
-------------------------------------------------------------
但是,有超过 20 列和数百万行。我试图避免使用 UDF,因为它们效率低下,而且在大多数情况下是一种糟糕的方法。我还避免分解每一列,因为那是:
- 效率低下(不必要地创建了数亿行)
- 不是一个优雅的解决方案
- 我试过了,还是不行
到目前为止我有以下内容:
def explodeCols(df):
return (df
.withColumn("production_date", sf.explode("production_date"))
.withColumn("expiration_date", sf.explode("expiration_date")))
def fixTypes(df):
return (df
.withColumn("production_date", sf.to_date("production_date", "dd/MM/yyyy"))
.withColumn("expiration_date", sf.to_date("expiration_date", "dd/MM/yyyy")))
def consolidate(df):
cols = ["production_date", "expiration_date"]
return df.groupBy("id").agg(*[sf.collect_list(c) for c in cols])
historyDF = (df
.transform(explodeCols)
.transform(fixTypes)
.transform(consolidate))
然而,当我 运行 DataBricks 上的这段代码时,作业永远不会执行,事实上,它会导致 failed/dead 个执行程序(这不好)。
我尝试的另一个解决方案如下:
df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))
但我得到的结果是一个空数组:
| production_date | good_prod_date |
-------------------------------------------------------------
|["20/05/1996","01/01/2018"] | [null,null] |
| .... .... |
-------------------------------------------------------------
使用 pyspark.sql.function.transform
高阶函数代替 explode
函数来转换数组中的每个值。
df
.withColumn("production_date",F.expr("transform(production_date,v -> to_date(v,'dd/MM/yyyy'))"))
.withColumn("expiration_date",F.expr("transform(expiration_date,v -> to_date(v,'dd/MM/yyyy'))"))
.show()
df.withColumn("good_prod_date", col("production_date").cast(ArrayType(DateType())))
这将不起作用,因为 production_date
具有不同的日期格式,如果此列具有 yyyy-MM-dd
之类的日期格式,则转换将起作用。
df.select("actual_date").printSchema()
root
|-- actual_date: array (nullable = true)
| |-- element: string (containsNull = true)
df.select("actual_date").show(false)
+------------------------+
|actual_date |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+
df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).printSchema()
root
|-- actual_date: array (nullable = true)
| |-- element: date (containsNull = true)
df.select("actual_date").withColumn("actual_date", F.col("actual_date").cast("array<date>")).show()
+------------------------+
|actual_date |
+------------------------+
|[1997-01-15, 2019-03-27]|
+------------------------+