PySpark 将列拆分为具有应用架构的新数据框

PySpark split column to new dataframe with applied schema

如何使用逗号将字符串列拆分为具有应用架构的新数据框?

例如,这里有一个包含两列(idvalue)的 pyspark DataFrame

df = sc.parallelize([(1, "200,201,hello"), (2, "23,24,hi")]).toDF(["id", "value"])

我想获取 value 列并将其拆分为一个新的 DataFrame 并应用以下架构:

from pyspark.sql.types import IntegerType, StringType, StructField, StructType

message_schema = StructType(
    [
        StructField("id", IntegerType()),
        StructField("value", IntegerType()),
        StructField("message", StringType()),
    ]
)

可行的是:

df_split = (
    df.select(split(df.value, ",\s*"))
    .rdd.flatMap(lambda x: x)
    .toDF()
)
df_split.show()

但我仍然需要根据架构转换和重命名列:

df_split.select(
    [
        col(_name).cast(_schema.dataType).alias(_schema.name)
        for _name, _schema in zip(df_split.columns, message_schema)
    ]
).show()

预期结果:

+---+-----+-------+
| id|value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+

对于 Spark 3+,有一个函数 from_csv,您可以使用该函数使用 DDL 格式的 message_schema 模式解析逗号分隔的字符串:

import pyspark.sql.functions as F

df1 = df.withColumn(
    "message",
    F.from_csv("value", message_schema.simpleString())
).select("message.*")

df1.show()
#+---+-----+-------+
#| id|value|message|
#+---+-----+-------+
#|200|  201|  hello|
#| 23|   24|     hi|
#+---+-----+-------+

df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- value: integer (nullable = true)
# |-- message: string (nullable = true)

每个步骤我都写的很详细,初学者也能轻松上手

#Importing Libs
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*

#creating SparkSession
spark = SparkSession.builder.appName("Whosebug").getOrCreate()

#data 
data = [(1, "200,201,hello"), (2, "23,24,hi")]

#creating dataframe
df=spark.createDataFrame(data)

df=df.drop(df['_1'])
df.show()
+-------------+
|           _2|
+-------------+
|200,201,hello|
|     23,24,hi|
+-------------+
df1=df.withColumn("id", split(df["_2"], ",").getItem(0))\
.withColumn("Value", split(df["_2"], ",").getItem(1))\
.withColumn("message", split(df["_2"], ",").getItem(2))

df1=df1.drop(df["_2"])
df1.show()

+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+

#casting as per defined schema
dff=df1.withColumn("id", df1['id'].cast(IntegerType())).withColumn("Value", df1['Value'].cast(IntegerType())).withColumn("message", df1['message'].cast(StringType()))                         
dff.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200|  201|  hello|
| 23|   24|     hi|
+---+-----+-------+