如何在 PySpark 中将列从字符串转换为数组
How to convert a column from string to array in PySpark
我有一个从继承的数据集转换而来的数据框,如下所示:
data = [("[]","2000","M",False),
("[{'username':'aabb','points':'200','active':'true'}, {'username':'bbaa22','points':'0','active':'false'}]","1999","F",True),
("[{'username':'topuser','points':'855','active':'false'}]","1974","M",True),
("[]","2005","F",False),
("[{'username':'myprimary','points':'1050','active':'true'}, {'username':'mylurk','points':'100','active':'true'}, {'username':'closedlurk','points':'50','active':'false'}]","1992","M",True)
]
columns=["user","dob_year","gender","member"]
df=spark.createDataFrame(data,columns)
+--------------------+--------+------+------+
| user|dob_year|gender|member|
+--------------------+--------+------+------+
| []| 2000| M| false|
|[{'username':'aab...| 1999| F| true|
|[{'username':'top...| 1974| M| true|
| []| 2005| F| false|
|[{'username':'myp...| 1992| M| true|
+--------------------+--------+------+------+
我需要从 user
列中提取一些元素,我尝试使用 pyspark explode
函数。
from pyspark.sql.functions import explode
df2 = df.select(explode(df.user), df.dob_year)
当我尝试这样做时,我遇到了以下错误:
AnalysisException: cannot resolve 'explode(user
)' due to data type mismatch: input to function explode should be array or map type, not string;
当我运行 df.printSchema()
时,我意识到用户列是字符串,而不是所需的列表。
我还尝试通过创建 UDF
将列中的字符串转换为数组
import pyspark.sql.functions as f
df2 = df.withColumn("user", str2list_udf(f.col("user"))).withColumn("user",df.user.cast(ArrayType(StringType())))
当我这样做时,我遇到了以下错误:
AnalysisException: cannot resolve 'user
' due to data type mismatch: cannot cast string to array;
如何将此列中的数据转换或转换为数组,以便可以利用爆炸函数并将各个键解析到它们自己的列中(例如:为 username
、points
和 active
)?如果 explode 不是最好的方法,我应该遵循另一条路线吗?
如果有帮助,这是我创建的 UDF:
def str2list(x):
if x == '[]':
return list()
else:
return list(x)
str2list_udf = udf(lambda x: str2list(x))
谢谢
使用from_json
将字符串化数组转换为结构数组,分解结果数组:
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, ArrayType, StringType, StructType
user_schema = ArrayType(
StructType([
StructField("username", StringType(), True),
StructField("points", StringType(), True),
StructField("active", StringType(), True)
])
)
df1 = (df.withColumn("user", F.from_json("user", user_schema))
.selectExpr("inline(user)", "dob_year", "gender", "member")
)
df1.show()
#+----------+------+------+--------+------+------+
#| username|points|active|dob_year|gender|member|
#+----------+------+------+--------+------+------+
#| aabb| 200| true| 1999| F| true|
#| bbaa22| 0| false| 1999| F| true|
#| topuser| 855| false| 1974| M| true|
#| myprimary| 1050| true| 1992| M| true|
#| mylurk| 100| true| 1992| M| true|
#|closedlurk| 50| false| 1992| M| true|
#+----------+------+------+--------+------+------+
我有一个从继承的数据集转换而来的数据框,如下所示:
data = [("[]","2000","M",False),
("[{'username':'aabb','points':'200','active':'true'}, {'username':'bbaa22','points':'0','active':'false'}]","1999","F",True),
("[{'username':'topuser','points':'855','active':'false'}]","1974","M",True),
("[]","2005","F",False),
("[{'username':'myprimary','points':'1050','active':'true'}, {'username':'mylurk','points':'100','active':'true'}, {'username':'closedlurk','points':'50','active':'false'}]","1992","M",True)
]
columns=["user","dob_year","gender","member"]
df=spark.createDataFrame(data,columns)
+--------------------+--------+------+------+
| user|dob_year|gender|member|
+--------------------+--------+------+------+
| []| 2000| M| false|
|[{'username':'aab...| 1999| F| true|
|[{'username':'top...| 1974| M| true|
| []| 2005| F| false|
|[{'username':'myp...| 1992| M| true|
+--------------------+--------+------+------+
我需要从 user
列中提取一些元素,我尝试使用 pyspark explode
函数。
from pyspark.sql.functions import explode
df2 = df.select(explode(df.user), df.dob_year)
当我尝试这样做时,我遇到了以下错误:
AnalysisException: cannot resolve 'explode(
user
)' due to data type mismatch: input to function explode should be array or map type, not string;
当我运行 df.printSchema()
时,我意识到用户列是字符串,而不是所需的列表。
我还尝试通过创建 UDF
将列中的字符串转换为数组import pyspark.sql.functions as f
df2 = df.withColumn("user", str2list_udf(f.col("user"))).withColumn("user",df.user.cast(ArrayType(StringType())))
当我这样做时,我遇到了以下错误:
AnalysisException: cannot resolve '
user
' due to data type mismatch: cannot cast string to array;
如何将此列中的数据转换或转换为数组,以便可以利用爆炸函数并将各个键解析到它们自己的列中(例如:为 username
、points
和 active
)?如果 explode 不是最好的方法,我应该遵循另一条路线吗?
如果有帮助,这是我创建的 UDF:
def str2list(x):
if x == '[]':
return list()
else:
return list(x)
str2list_udf = udf(lambda x: str2list(x))
谢谢
使用from_json
将字符串化数组转换为结构数组,分解结果数组:
from pyspark.sql import functions as F
from pyspark.sql.types import StructField, ArrayType, StringType, StructType
user_schema = ArrayType(
StructType([
StructField("username", StringType(), True),
StructField("points", StringType(), True),
StructField("active", StringType(), True)
])
)
df1 = (df.withColumn("user", F.from_json("user", user_schema))
.selectExpr("inline(user)", "dob_year", "gender", "member")
)
df1.show()
#+----------+------+------+--------+------+------+
#| username|points|active|dob_year|gender|member|
#+----------+------+------+--------+------+------+
#| aabb| 200| true| 1999| F| true|
#| bbaa22| 0| false| 1999| F| true|
#| topuser| 855| false| 1974| M| true|
#| myprimary| 1050| true| 1992| M| true|
#| mylurk| 100| true| 1992| M| true|
#|closedlurk| 50| false| 1992| M| true|
#+----------+------+------+--------+------+------+