如何在 PySpark 2.1.0 中定义事件时间 windows 的 UDAF
How to define UDAF over event-time windows in PySpark 2.1.0
我正在编写一个 Python 应用程序,它在一系列值上滑动 window,每个值都有一个时间戳。我想对滑动 window 中的值应用一个函数,以便从 N 个最新值计算得分,如图所示。我们已经使用 Python 库实现了该功能以利用 GPU。
我发现 Apache Spark 2.0 附带结构化流,它支持 window 事件时间操作。如果你想从一个.csv文件中读取一个有限序列的记录,并且想在这样的滑动中统计记录数window,你可以在PySpark中使用如下代码:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd
spark = SparkSession \
.builder \
.master('local[*]') \
.getOrCreate()
schema = StructType() \
.add('ts', 'timestamp') \
.add('value', 'double') \
sqlContext = SQLContext(spark)
lines = sqlContext \
.readStream \
.format('csv') \
.schema(schema) \
.load(path='file:///'+getcwd()+'/csv')
windowedCount = lines.groupBy(
window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'})
query = windowedCount \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
但是,我想在滑动 windows 上应用预定义聚合函数以外的 UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的聚合函数只有avg、max、min、sum和count。
还不支持?如果是这样,PySpark 什么时候支持它?
表明可以在 Java 或 Scala 中定义 UserDefinedAggregateFunction,然后在 PySpark 中调用它。这看起来很有趣,但我想在滑动 windows 中对值应用我自己的 Python 函数。我想要一种纯粹的 Pythonic 方式。
p.s。让我知道 Python 中除 PySpark 之外的任何框架都可以解决此类问题(在 window 流上滑动时应用 UDAF)。
在 Spark <2.3 中,您不能这样做。
对于 Spark >= 2.3,这对于分组数据是可能的,但对于使用 "PySpark UDAFs with Pandas" 的 Windows 尚不可行。
目前,PySpark 不能 运行 Windows 上的用户定义函数。
这里有一个描述得很好的 SO 问题:Applying UDFs on GroupedData in PySpark (with functioning python example)
这是添加此功能的 JIRA 工单 - https://issues.apache.org/jira/browse/SPARK-10915
我正在编写一个 Python 应用程序,它在一系列值上滑动 window,每个值都有一个时间戳。我想对滑动 window 中的值应用一个函数,以便从 N 个最新值计算得分,如图所示。我们已经使用 Python 库实现了该功能以利用 GPU。
我发现 Apache Spark 2.0 附带结构化流,它支持 window 事件时间操作。如果你想从一个.csv文件中读取一个有限序列的记录,并且想在这样的滑动中统计记录数window,你可以在PySpark中使用如下代码:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType
from pyspark.sql.functions import window
from os import getcwd
spark = SparkSession \
.builder \
.master('local[*]') \
.getOrCreate()
schema = StructType() \
.add('ts', 'timestamp') \
.add('value', 'double') \
sqlContext = SQLContext(spark)
lines = sqlContext \
.readStream \
.format('csv') \
.schema(schema) \
.load(path='file:///'+getcwd()+'/csv')
windowedCount = lines.groupBy(
window(lines.ts, '30 minutes', '10 minutes')
).agg({'value':'count'})
query = windowedCount \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
但是,我想在滑动 windows 上应用预定义聚合函数以外的 UDAF。根据https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=agg#pyspark.sql.GroupedData.agg,可用的聚合函数只有avg、max、min、sum和count。
还不支持?如果是这样,PySpark 什么时候支持它?
p.s。让我知道 Python 中除 PySpark 之外的任何框架都可以解决此类问题(在 window 流上滑动时应用 UDAF)。
在 Spark <2.3 中,您不能这样做。
对于 Spark >= 2.3,这对于分组数据是可能的,但对于使用 "PySpark UDAFs with Pandas" 的 Windows 尚不可行。
目前,PySpark 不能 运行 Windows 上的用户定义函数。
这里有一个描述得很好的 SO 问题:Applying UDFs on GroupedData in PySpark (with functioning python example)
这是添加此功能的 JIRA 工单 - https://issues.apache.org/jira/browse/SPARK-10915