在派生自其他列的数据框中添加新列(Spark)

Adding a new column in Data Frame derived from other columns (Spark)

我正在使用 Spark 1.3.0 和 Python。我有一个数据框,我希望添加一个从其他列派生的附加列。像这样,

>>old_df.columns
[col_1, col_2, ..., col_m]

>>new_df.columns
[col_1, col_2, ..., col_m, col_n]

其中

col_n = col_3 - col_4

如何在 PySpark 中执行此操作?

一种实现方法是使用 withColumn 方法:

old_df = sqlContext.createDataFrame(sc.parallelize(
    [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))

new_df = old_df.withColumn('col_n', old_df.col_1 - old_df.col_2)

或者您可以在已注册的 table 上使用 SQL:

old_df.registerTempTable('old_df')
new_df = sqlContext.sql('SELECT *, col_1 - col_2 AS col_n FROM old_df')

此外,我们可以使用udf

from pyspark.sql.functions import udf,col
from pyspark.sql.types import IntegerType
from pyspark import SparkContext
from pyspark.sql import SQLContext

sc = SparkContext()
sqlContext = SQLContext(sc)
old_df = sqlContext.createDataFrame(sc.parallelize(
    [(0, 1), (1, 3), (2, 5)]), ('col_1', 'col_2'))
function = udf(lambda col1, col2 : col1-col2, IntegerType())
new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2')))
new_df.show()

您可以通过以下方式添加新列:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([[1, 2], [3, 4]], ['col1', 'col2'])
df.show()

+----+----+
|col1|col2|
+----+----+
|   1|   2|
|   3|   4|
+----+----+

-- 使用方法withColumn:

import pyspark.sql.functions as F

df.withColumn('col3', F.col('col2') - F.col('col1')) # col function

df.withColumn('col3', df['col2'] - df['col1']) # bracket notation

df.withColumn('col3', df.col2 - df.col1) # dot notation

-- 使用方法select:

df.select('*', (F.col('col2') - F.col('col1')).alias('col3'))

表达式 '*' returns 所有列。

-- 使用方法selectExpr:

df.selectExpr('*', 'col2 - col1 as col3')

-- 使用 SQL:

df.createOrReplaceTempView('df_view')

spark.sql('select *, col2 - col1 as col3 from df_view')

结果:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   1|   2|   1|
|   3|   4|   1|
+----+----+----+