等同于 Scala Dataset#transform 方法的 Pyspark 转换方法
Pyspark transform method that's equivalent to the Scala Dataset#transform method
Spark Scala API 有一个 Dataset#transform
方法可以轻松链接自定义 DataFrame 转换,如下所示:
val weirdDf = df
.transform(myFirstCustomTransformation)
.transform(anotherCustomTransformation)
我没有看到 pyspark in the documentation 的等效 transform
方法。
是否有 PySpark 方法来链接自定义转换?
如果不是,pyspark.sql.DataFrame
class 如何通过猴子修补来添加 transform
方法?
更新
变换方法是added to PySpark as of PySpark 3.0。
实施:
from pyspark.sql.dataframe import DataFrame
def transform(self, f):
return f(self)
DataFrame.transform = transform
用法:
spark.range(1).transform(lambda df: df.selectExpr("id * 2"))
使用 SQLTransformer 对象(或任何其他 Transformer)的 Transformer 管道是一种 Spark 解决方案,它使链接转换变得容易。例如:
from pyspark.ml.feature import SQLTransformer
from pyspark.ml import Pipeline, PipelineModel
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlSelectExpr = SQLTransformer(statement="SELECT *, (id * 2) AS v5 FROM __THIS__")
pipeline = Pipeline(stages=[sqlTrans, sqlSelectExpr])
pipelineModel = pipeline.fit(df)
pipelineModel.transform(df).show()
当所有转换都是如上所述的简单表达式时,另一种链接方法是使用单个 SQLTransformer 和字符串操作:
transforms = ['(v1 + v2) AS v3',
'(v1 * v2) AS v4',
'(id * 2) AS v5',
]
selectExpr = "SELECT *, {} FROM __THIS__".format(",".join(transforms))
sqlSelectExpr = SQLTransformer(statement=selectExpr)
sqlSelectExpr.transform(df).show()
请记住,Spark SQL 转换可以优化,并且比定义为 Python 用户定义函数 (UDF) 的转换更快。
Spark Scala API 有一个 Dataset#transform
方法可以轻松链接自定义 DataFrame 转换,如下所示:
val weirdDf = df
.transform(myFirstCustomTransformation)
.transform(anotherCustomTransformation)
我没有看到 pyspark in the documentation 的等效 transform
方法。
是否有 PySpark 方法来链接自定义转换?
如果不是,pyspark.sql.DataFrame
class 如何通过猴子修补来添加 transform
方法?
更新
变换方法是added to PySpark as of PySpark 3.0。
实施:
from pyspark.sql.dataframe import DataFrame
def transform(self, f):
return f(self)
DataFrame.transform = transform
用法:
spark.range(1).transform(lambda df: df.selectExpr("id * 2"))
使用 SQLTransformer 对象(或任何其他 Transformer)的 Transformer 管道是一种 Spark 解决方案,它使链接转换变得容易。例如:
from pyspark.ml.feature import SQLTransformer
from pyspark.ml import Pipeline, PipelineModel
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlSelectExpr = SQLTransformer(statement="SELECT *, (id * 2) AS v5 FROM __THIS__")
pipeline = Pipeline(stages=[sqlTrans, sqlSelectExpr])
pipelineModel = pipeline.fit(df)
pipelineModel.transform(df).show()
当所有转换都是如上所述的简单表达式时,另一种链接方法是使用单个 SQLTransformer 和字符串操作:
transforms = ['(v1 + v2) AS v3',
'(v1 * v2) AS v4',
'(id * 2) AS v5',
]
selectExpr = "SELECT *, {} FROM __THIS__".format(",".join(transforms))
sqlSelectExpr = SQLTransformer(statement=selectExpr)
sqlSelectExpr.transform(df).show()
请记住,Spark SQL 转换可以优化,并且比定义为 Python 用户定义函数 (UDF) 的转换更快。