对 window 函数取平均值会导致 StackOverflowError

Averaging over window function leads to StackOverflowError

我正在尝试使用 window 函数确定 Dataframe 列中日期之间的平均时间跨度。但是,具体化 Dataframe 会引发 Java 异常。

考虑以下示例:

from pyspark import SparkContext
from pyspark.sql import HiveContext, Window, functions
from datetime import datetime

sc = SparkContext()
sq = HiveContext(sc)

data = [
    [datetime(2014,1,1)],
    [datetime(2014,2,1)],
    [datetime(2014,3,1)],
    [datetime(2014,3,6)],
    [datetime(2014,8,23)],
    [datetime(2014,10,1)],
]
df = sq.createDataFrame(data, schema=['ts'])

ts = functions.col('ts')

w = Window.orderBy(ts)
diff = functions.datediff(
    ts,
    functions.lag(ts, count=1).over(w)
)

avg_diff = functions.avg(diff)

虽然 df.select(diff.alias('diff')).show() 正确呈现为

+----+
|diff|
+----+
|null|
|  31|
|  28|
|   5|
| 170|
|  39|
+----+

df.select(avg_diff).show() 得到 java.lang.WhosebugError.

我认为这应该有效吗?如果是这样,我做错了什么,我可以做些什么呢?

我在 Spark 1.6

上使用 Python API

当我做 df2 = df.select(diff.alias('diff')) 然后做

df2.select(functions.avg('diff'))

没有错误。不幸的是,这不是我当前设置中的一个选项。

它看起来像是 Catalyst 中的错误但是。链接方法应该可以正常工作:

df.select(diff.alias('diff')).agg(functions.avg('diff'))

尽管如此,我还是要小心。 Window 函数不应用于执行全局(没有 PARTITION BY 子句)操作。这些将所有数据移动到单个分区并执行顺序扫描。在这里使用 RDD 可能是更好的选择。