更新 spark 中的数据框列
Updating a dataframe column in spark
查看新的 spark DataFrame API,不清楚是否可以修改 dataframe 列。
我将如何更改数据框的行 x
列 y
中的值?
在 pandas
中将是:
df.ix[x,y] = new_value
编辑:合并下面所说的,你不能修改现有的数据框,因为它是不可变的,但你可以return一个新的数据框修改。
如果您只想根据条件替换列中的值,例如 np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
如果您想对列执行一些操作并创建一个新列添加到数据框中:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
如果您希望新列与旧列同名,您可以添加额外的步骤:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
DataFrames
基于 RDD。 RDD 是不可变结构,不允许在现场更新元素。要更改值,您需要通过使用 SQL 类 DSL 或 RDD 操作(例如 map
.
转换原始 DataFrame 来创建新的 DataFrame
强烈推荐的幻灯片:Introducing DataFrames in Spark for Large Scale Data Science。
虽然您不能像这样修改列,但您可以对列进行操作,return 反映该更改的新 DataFrame。为此,您首先要创建一个 UserDefinedFunction
来实现要应用的操作,然后有选择地将该函数仅应用于目标列。在 Python:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
现在具有与 old_df
相同的架构(假设 old_df.target_column
也是 StringType
类型)但列 target_column
中的所有值将是 new_value
.
正如 maasg 所说,您可以根据应用于旧 DataFrame 的映射结果创建新的 DataFrame。具有两行的给定 DataFrame df
的示例:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
请注意,如果列的类型发生变化,您需要为其提供正确的架构而不是 df.schema
。查看 org.apache.spark.sql.Row
的 api 以获取可用方法:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[更新] 或者在 Scala 中使用 UDF:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
如果列名需要保持不变,您可以将其重新命名:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
通常在更新列时,我们希望将旧值映射到新值。这是在没有 UDF 的情况下在 pyspark 中执行此操作的方法:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
从 pyspark.sql.functions 导入 col, when 并将第五列更新为 integer(0,1,2) 基于将字符串(字符串 a,字符串 b,字符串 c)转换为新的 DataFrame。
from pyspark.sql.functions import col, when
data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
查看新的 spark DataFrame API,不清楚是否可以修改 dataframe 列。
我将如何更改数据框的行 x
列 y
中的值?
在 pandas
中将是:
df.ix[x,y] = new_value
编辑:合并下面所说的,你不能修改现有的数据框,因为它是不可变的,但你可以return一个新的数据框修改。
如果您只想根据条件替换列中的值,例如 np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
如果您想对列执行一些操作并创建一个新列添加到数据框中:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
如果您希望新列与旧列同名,您可以添加额外的步骤:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
DataFrames
基于 RDD。 RDD 是不可变结构,不允许在现场更新元素。要更改值,您需要通过使用 SQL 类 DSL 或 RDD 操作(例如 map
.
强烈推荐的幻灯片:Introducing DataFrames in Spark for Large Scale Data Science。
虽然您不能像这样修改列,但您可以对列进行操作,return 反映该更改的新 DataFrame。为此,您首先要创建一个 UserDefinedFunction
来实现要应用的操作,然后有选择地将该函数仅应用于目标列。在 Python:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])
new_df
现在具有与 old_df
相同的架构(假设 old_df.target_column
也是 StringType
类型)但列 target_column
中的所有值将是 new_value
.
正如 maasg 所说,您可以根据应用于旧 DataFrame 的映射结果创建新的 DataFrame。具有两行的给定 DataFrame df
的示例:
val newDf = sqlContext.createDataFrame(df.map(row =>
Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)
请注意,如果列的类型发生变化,您需要为其提供正确的架构而不是 df.schema
。查看 org.apache.spark.sql.Row
的 api 以获取可用方法:https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html
[更新] 或者在 Scala 中使用 UDF:
import org.apache.spark.sql.functions._
val toLong = udf[Long, String] (_.toLong)
val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")
如果列名需要保持不变,您可以将其重新命名:
modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
通常在更新列时,我们希望将旧值映射到新值。这是在没有 UDF 的情况下在 pyspark 中执行此操作的方法:
# update df[update_col], mapping old_value --> new_value
from pyspark.sql import functions as F
df = df.withColumn(update_col,
F.when(df[update_col]==old_value,new_value).
otherwise(df[update_col])).
从 pyspark.sql.functions 导入 col, when 并将第五列更新为 integer(0,1,2) 基于将字符串(字符串 a,字符串 b,字符串 c)转换为新的 DataFrame。
from pyspark.sql.functions import col, when
data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))