如何在 PySpark 中将列从字符串转换为数组

How to convert a column from string to array in PySpark

我有一个从继承的数据集转换而来的数据框,如下所示:

data = [("[]","2000","M",False),
            ("[{'username':'aabb','points':'200','active':'true'}, {'username':'bbaa22','points':'0','active':'false'}]","1999","F",True),
            ("[{'username':'topuser','points':'855','active':'false'}]","1974","M",True),
            ("[]","2005","F",False),
            ("[{'username':'myprimary','points':'1050','active':'true'}, {'username':'mylurk','points':'100','active':'true'}, {'username':'closedlurk','points':'50','active':'false'}]","1992","M",True)
            ]

columns=["user","dob_year","gender","member"]
df=spark.createDataFrame(data,columns)
+--------------------+--------+------+------+
|                user|dob_year|gender|member|
+--------------------+--------+------+------+
|                  []|    2000|     M| false|
|[{'username':'aab...|    1999|     F|  true|
|[{'username':'top...|    1974|     M|  true|
|                  []|    2005|     F| false|
|[{'username':'myp...|    1992|     M|  true|
+--------------------+--------+------+------+

我需要从 user 列中提取一些元素,我尝试使用 pyspark explode 函数。

from pyspark.sql.functions import explode

df2 = df.select(explode(df.user), df.dob_year)

当我尝试这样做时,我遇到了以下错误:

AnalysisException: cannot resolve 'explode(user)' due to data type mismatch: input to function explode should be array or map type, not string;

当我运行 df.printSchema()时,我意识到用户列是字符串,而不是所需的列表。

我还尝试通过创建 UDF

将列中的字符串转换为数组
import pyspark.sql.functions as f

df2 = df.withColumn("user", str2list_udf(f.col("user"))).withColumn("user",df.user.cast(ArrayType(StringType())))

当我这样做时,我遇到了以下错误:

AnalysisException: cannot resolve 'user' due to data type mismatch: cannot cast string to array;

如何将此列中的数据转换或转换为数组,以便可以利用爆炸函数并将各个键解析到它们自己的列中(例如:为 usernamepointsactive)?如果 explode 不是最好的方法,我应该遵循另一条路线吗?

如果有帮助,这是我创建的 UDF:

def str2list(x):
  if x == '[]':
    return list()
  else:
    return list(x)

str2list_udf = udf(lambda x: str2list(x))

谢谢

使用from_json将字符串化数组转换为结构数组,分解结果数组:

from pyspark.sql import functions as F
from pyspark.sql.types import StructField, ArrayType, StringType, StructType

user_schema = ArrayType(
    StructType([
        StructField("username", StringType(), True),
        StructField("points", StringType(), True),
        StructField("active", StringType(), True)
    ])
)

df1 = (df.withColumn("user", F.from_json("user", user_schema))
       .selectExpr("inline(user)", "dob_year", "gender", "member")
       )

df1.show()
#+----------+------+------+--------+------+------+
#|  username|points|active|dob_year|gender|member|
#+----------+------+------+--------+------+------+
#|      aabb|   200|  true|    1999|     F|  true|
#|    bbaa22|     0| false|    1999|     F|  true|
#|   topuser|   855| false|    1974|     M|  true|
#| myprimary|  1050|  true|    1992|     M|  true|
#|    mylurk|   100|  true|    1992|     M|  true|
#|closedlurk|    50| false|    1992|     M|  true|
#+----------+------+------+--------+------+------+