在 pyspark 数组上应用 withColumn
Apply withColumn on pyspark array
这是我的代码:
from pyspark.sql import *
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee1, employee2])
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)
我想加入数组中的名字和姓氏。
from pyspark.sql import functions as sf
df2 = df1.withColumn("employees.FullName", sf.concat(sf.col('employees.firstName'), sf.col('employees.lastName')))
df2.printSchema()
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: long (nullable = true)
|-- employees.FullName: array (nullable = true)
| |-- element: string (containsNull = true)
我的新专栏FullName在父级,如何把它们放在数组里。
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: long (nullable = true)
| | |-- FullName: string (containsNull = true)
一种方法是分解你的结构数组,使用inline_outer
,然后使用concat_ws
获取您的全名和 assemble 所有这些都使用 array
、struct
。
from pyspark.sql import functions as F
df1.selectExpr("department","""inline_outer(employees)""")\
.withColumn("FullName", F.concat_ws(" ","firstName","lastName"))\
.select("department", F.array(F.struct(*[F.col(x).alias(x) for x in\
['firstName','lastName','email','salary','FullName']]))\
.alias("employees")).printSchema()
#root
#|-- department: struct (nullable = true)
#| |-- id: string (nullable = true)
#| |-- name: string (nullable = true)
#|-- employees: array (nullable = false)
#| |-- element: struct (containsNull = false)
#| | |-- firstName: string (nullable = true)
#| | |-- lastName: string (nullable = true)
#| | |-- email: string (nullable = true)
#| | |-- salary: long (nullable = true)
#| | |-- FullName: string (nullable = false)
这是我的代码:
from pyspark.sql import *
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee1, employee2])
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)
我想加入数组中的名字和姓氏。
from pyspark.sql import functions as sf
df2 = df1.withColumn("employees.FullName", sf.concat(sf.col('employees.firstName'), sf.col('employees.lastName')))
df2.printSchema()
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: long (nullable = true)
|-- employees.FullName: array (nullable = true)
| |-- element: string (containsNull = true)
我的新专栏FullName在父级,如何把它们放在数组里。
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: long (nullable = true)
| | |-- FullName: string (containsNull = true)
一种方法是分解你的结构数组,使用inline_outer
,然后使用concat_ws
获取您的全名和 assemble 所有这些都使用 array
、struct
。
from pyspark.sql import functions as F
df1.selectExpr("department","""inline_outer(employees)""")\
.withColumn("FullName", F.concat_ws(" ","firstName","lastName"))\
.select("department", F.array(F.struct(*[F.col(x).alias(x) for x in\
['firstName','lastName','email','salary','FullName']]))\
.alias("employees")).printSchema()
#root
#|-- department: struct (nullable = true)
#| |-- id: string (nullable = true)
#| |-- name: string (nullable = true)
#|-- employees: array (nullable = false)
#| |-- element: struct (containsNull = false)
#| | |-- firstName: string (nullable = true)
#| | |-- lastName: string (nullable = true)
#| | |-- email: string (nullable = true)
#| | |-- salary: long (nullable = true)
#| | |-- FullName: string (nullable = false)