pySpark DataFrames 聚合函数 SciPy
pySpark DataFrames Aggregation Functions with SciPy
我尝试了几种不同的场景,尝试使用 Spark 的 1.3 DataFrames 来处理诸如 sciPy kurtosis 或 numpy std 之类的事情。这是示例代码,但它仅挂在 10x10 数据集(10 行 10 列)上。我试过:
print df.groupBy().agg(kurtosis(df.offer_id)).collect()
print df.agg(kurtosis(df.offer_ID)).collect()
但这没问题:
print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect()
我的猜测是因为 F 是:from pyspark.sql import functions as F
是一个编程的 sql 函数。我将如何使用数据帧在数据集上做诸如峰度之类的事情?
这也只是挂起:
print df.map(kurtosis(df.offer_id)).collect()
遗憾的是,Spark SQL 当前对 Python UDF 的 UDF 支持有点欠缺。我一直在考虑尝试在 Scala 中添加一些 UDF,并让它们可以从 Python 为我正在处理的项目调用,所以我使用 kurtosis 作为 UDAF 进行了快速概念验证。该分行目前居住在https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support
第一步是在 Scala 中定义我们的 UDAF - 这可能不太理想,但这是一个实现:
object functions {
def kurtosis(e: Column): Column = new Column(Kurtosis(EvilSqlTools.getExpr(e)))
}
case class Kurtosis(child: Expression) extends AggregateExpression {
def this() = this(null)
override def children = child :: Nil
override def nullable: Boolean = true
override def dataType: DataType = DoubleType
override def toString: String = s"Kurtosis($child)"
override def newInstance() = new KurtosisFunction(child, this)
}
case class KurtosisFunction(child: Expression, base: AggregateExpression) extends AggregateFunction {
def this() = this(null, null)
var data = scala.collection.mutable.ArrayBuffer.empty[Any]
override def update(input: Row): Unit = {
data += child.eval(input)
}
// This function seems shaaady
// TODO: Do something more reasonable
private def toDouble(x: Any): Double = {
x match {
case x: NumericType => EvilSqlTools.toDouble(x.asInstanceOf[NumericType])
case x: Long => x.toDouble
case x: Int => x.toDouble
case x: Double => x
}
}
override def eval(input: Row): Any = {
if (data.isEmpty) {
println("No data???")
null
} else {
val inputAsDoubles = data.toList.map(toDouble)
println("computing on input "+inputAsDoubles)
val inputArray = inputAsDoubles.toArray
val apacheKurtosis = new ApacheKurtosis()
val result = apacheKurtosis.evaluate(inputArray, 0, inputArray.size)
println("result "+result)
Cast(Literal(result), DoubleType).eval(null)
}
}
}
然后我们可以使用与 Spark SQL 的 functions.py 实现类似的逻辑:
"""Our magic extend functions. Here lies dragons and a sleepy holden."""
from py4j.java_collections import ListConverter
from pyspark import SparkContext
from pyspark.sql.dataframe import Column, _to_java_column
__all__ = []
def _create_function(name, doc=""):
""" Create a function for aggregator by name"""
def _(col):
sc = SparkContext._active_spark_context
jc = getattr(sc._jvm.com.sparklingpandas.functions, name)(col._jc if isinstance(col, Column) else col)
return Column(jc)
_.__name__ = name
_.__doc__ = doc
return _
_functions = {
'kurtosis': 'Calculate the kurtosis, maybe!',
}
for _name, _doc in _functions.items():
globals()[_name] = _create_function(_name, _doc)
del _name, _doc
__all__ += _functions.keys()
__all__.sort()
然后我们可以继续将其称为 UDAF,如下所示:
from sparklingpandas.custom_functions import *
import random
input = range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6)
df1 = sqlContext.createDataFrame(sc.parallelize(input)\
.map(lambda i: Row(single=i, rand= random.randint(0,100000))))
df1.collect()
import pyspark.sql.functions as F
x = df1.groupBy(df1.single).agg(F.min(df1.rand))
x.collect()
j = df1.groupBy(df1.single).agg(kurtosis(df1.rand))
j.collect()
我尝试了几种不同的场景,尝试使用 Spark 的 1.3 DataFrames 来处理诸如 sciPy kurtosis 或 numpy std 之类的事情。这是示例代码,但它仅挂在 10x10 数据集(10 行 10 列)上。我试过:
print df.groupBy().agg(kurtosis(df.offer_id)).collect()
print df.agg(kurtosis(df.offer_ID)).collect()
但这没问题:
print df.agg(F.min(df.offer_id), F.min(df.decision_id)).collect()
我的猜测是因为 F 是:from pyspark.sql import functions as F
是一个编程的 sql 函数。我将如何使用数据帧在数据集上做诸如峰度之类的事情?
这也只是挂起:
print df.map(kurtosis(df.offer_id)).collect()
遗憾的是,Spark SQL 当前对 Python UDF 的 UDF 支持有点欠缺。我一直在考虑尝试在 Scala 中添加一些 UDF,并让它们可以从 Python 为我正在处理的项目调用,所以我使用 kurtosis 作为 UDAF 进行了快速概念验证。该分行目前居住在https://github.com/holdenk/sparklingpandas/tree/add-kurtosis-support
第一步是在 Scala 中定义我们的 UDAF - 这可能不太理想,但这是一个实现:
object functions {
def kurtosis(e: Column): Column = new Column(Kurtosis(EvilSqlTools.getExpr(e)))
}
case class Kurtosis(child: Expression) extends AggregateExpression {
def this() = this(null)
override def children = child :: Nil
override def nullable: Boolean = true
override def dataType: DataType = DoubleType
override def toString: String = s"Kurtosis($child)"
override def newInstance() = new KurtosisFunction(child, this)
}
case class KurtosisFunction(child: Expression, base: AggregateExpression) extends AggregateFunction {
def this() = this(null, null)
var data = scala.collection.mutable.ArrayBuffer.empty[Any]
override def update(input: Row): Unit = {
data += child.eval(input)
}
// This function seems shaaady
// TODO: Do something more reasonable
private def toDouble(x: Any): Double = {
x match {
case x: NumericType => EvilSqlTools.toDouble(x.asInstanceOf[NumericType])
case x: Long => x.toDouble
case x: Int => x.toDouble
case x: Double => x
}
}
override def eval(input: Row): Any = {
if (data.isEmpty) {
println("No data???")
null
} else {
val inputAsDoubles = data.toList.map(toDouble)
println("computing on input "+inputAsDoubles)
val inputArray = inputAsDoubles.toArray
val apacheKurtosis = new ApacheKurtosis()
val result = apacheKurtosis.evaluate(inputArray, 0, inputArray.size)
println("result "+result)
Cast(Literal(result), DoubleType).eval(null)
}
}
}
然后我们可以使用与 Spark SQL 的 functions.py 实现类似的逻辑:
"""Our magic extend functions. Here lies dragons and a sleepy holden."""
from py4j.java_collections import ListConverter
from pyspark import SparkContext
from pyspark.sql.dataframe import Column, _to_java_column
__all__ = []
def _create_function(name, doc=""):
""" Create a function for aggregator by name"""
def _(col):
sc = SparkContext._active_spark_context
jc = getattr(sc._jvm.com.sparklingpandas.functions, name)(col._jc if isinstance(col, Column) else col)
return Column(jc)
_.__name__ = name
_.__doc__ = doc
return _
_functions = {
'kurtosis': 'Calculate the kurtosis, maybe!',
}
for _name, _doc in _functions.items():
globals()[_name] = _create_function(_name, _doc)
del _name, _doc
__all__ += _functions.keys()
__all__.sort()
然后我们可以继续将其称为 UDAF,如下所示:
from sparklingpandas.custom_functions import *
import random
input = range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6) + range(1,6)
df1 = sqlContext.createDataFrame(sc.parallelize(input)\
.map(lambda i: Row(single=i, rand= random.randint(0,100000))))
df1.collect()
import pyspark.sql.functions as F
x = df1.groupBy(df1.single).agg(F.min(df1.rand))
x.collect()
j = df1.groupBy(df1.single).agg(kurtosis(df1.rand))
j.collect()