PySpark 将列拆分为具有应用架构的新数据框
PySpark split column to new dataframe with applied schema
如何使用逗号将字符串列拆分为具有应用架构的新数据框?
例如,这里有一个包含两列(id
和 value
)的 pyspark DataFrame
df = sc.parallelize([(1, "200,201,hello"), (2, "23,24,hi")]).toDF(["id", "value"])
我想获取 value
列并将其拆分为一个新的 DataFrame 并应用以下架构:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
message_schema = StructType(
[
StructField("id", IntegerType()),
StructField("value", IntegerType()),
StructField("message", StringType()),
]
)
可行的是:
df_split = (
df.select(split(df.value, ",\s*"))
.rdd.flatMap(lambda x: x)
.toDF()
)
df_split.show()
但我仍然需要根据架构转换和重命名列:
df_split.select(
[
col(_name).cast(_schema.dataType).alias(_schema.name)
for _name, _schema in zip(df_split.columns, message_schema)
]
).show()
预期结果:
+---+-----+-------+
| id|value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
对于 Spark 3+,有一个函数 from_csv
,您可以使用该函数使用 DDL 格式的 message_schema
模式解析逗号分隔的字符串:
import pyspark.sql.functions as F
df1 = df.withColumn(
"message",
F.from_csv("value", message_schema.simpleString())
).select("message.*")
df1.show()
#+---+-----+-------+
#| id|value|message|
#+---+-----+-------+
#|200| 201| hello|
#| 23| 24| hi|
#+---+-----+-------+
df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- value: integer (nullable = true)
# |-- message: string (nullable = true)
每个步骤我都写的很详细,初学者也能轻松上手
#Importing Libs
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*
#creating SparkSession
spark = SparkSession.builder.appName("Whosebug").getOrCreate()
#data
data = [(1, "200,201,hello"), (2, "23,24,hi")]
#creating dataframe
df=spark.createDataFrame(data)
df=df.drop(df['_1'])
df.show()
+-------------+
| _2|
+-------------+
|200,201,hello|
| 23,24,hi|
+-------------+
df1=df.withColumn("id", split(df["_2"], ",").getItem(0))\
.withColumn("Value", split(df["_2"], ",").getItem(1))\
.withColumn("message", split(df["_2"], ",").getItem(2))
df1=df1.drop(df["_2"])
df1.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
#casting as per defined schema
dff=df1.withColumn("id", df1['id'].cast(IntegerType())).withColumn("Value", df1['Value'].cast(IntegerType())).withColumn("message", df1['message'].cast(StringType()))
dff.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
如何使用逗号将字符串列拆分为具有应用架构的新数据框?
例如,这里有一个包含两列(id
和 value
)的 pyspark DataFrame
df = sc.parallelize([(1, "200,201,hello"), (2, "23,24,hi")]).toDF(["id", "value"])
我想获取 value
列并将其拆分为一个新的 DataFrame 并应用以下架构:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
message_schema = StructType(
[
StructField("id", IntegerType()),
StructField("value", IntegerType()),
StructField("message", StringType()),
]
)
可行的是:
df_split = (
df.select(split(df.value, ",\s*"))
.rdd.flatMap(lambda x: x)
.toDF()
)
df_split.show()
但我仍然需要根据架构转换和重命名列:
df_split.select(
[
col(_name).cast(_schema.dataType).alias(_schema.name)
for _name, _schema in zip(df_split.columns, message_schema)
]
).show()
预期结果:
+---+-----+-------+
| id|value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
对于 Spark 3+,有一个函数 from_csv
,您可以使用该函数使用 DDL 格式的 message_schema
模式解析逗号分隔的字符串:
import pyspark.sql.functions as F
df1 = df.withColumn(
"message",
F.from_csv("value", message_schema.simpleString())
).select("message.*")
df1.show()
#+---+-----+-------+
#| id|value|message|
#+---+-----+-------+
#|200| 201| hello|
#| 23| 24| hi|
#+---+-----+-------+
df1.printSchema()
#root
# |-- id: integer (nullable = true)
# |-- value: integer (nullable = true)
# |-- message: string (nullable = true)
每个步骤我都写的很详细,初学者也能轻松上手
#Importing Libs
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import*
#creating SparkSession
spark = SparkSession.builder.appName("Whosebug").getOrCreate()
#data
data = [(1, "200,201,hello"), (2, "23,24,hi")]
#creating dataframe
df=spark.createDataFrame(data)
df=df.drop(df['_1'])
df.show()
+-------------+
| _2|
+-------------+
|200,201,hello|
| 23,24,hi|
+-------------+
df1=df.withColumn("id", split(df["_2"], ",").getItem(0))\
.withColumn("Value", split(df["_2"], ",").getItem(1))\
.withColumn("message", split(df["_2"], ",").getItem(2))
df1=df1.drop(df["_2"])
df1.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+
#casting as per defined schema
dff=df1.withColumn("id", df1['id'].cast(IntegerType())).withColumn("Value", df1['Value'].cast(IntegerType())).withColumn("message", df1['message'].cast(StringType()))
dff.show()
+---+-----+-------+
| id|Value|message|
+---+-----+-------+
|200| 201| hello|
| 23| 24| hi|
+---+-----+-------+