如何在 PySpark 中的多个时间间隔内使用 .filter() 操作?

How to use .filter() operation over multiple time intervals in PySpark?

我想优化以下功能。我正在使用广播内部连接,我认为它不够快。

我有一个具有以下属性的区间数据帧:timestamp_start、timestamp_end 以及一个时间序列数据帧元组,其属性为:时间戳、值。

函数然后 returns 属于区间之一的所有值:


def filter_intervals(intervals, df):

    df = df.join(broadcast(intervals),
        [df.timestamp >= intervals.timestamp_start,
         df.timestamp <= intervals.timestamp_end],
         how='inner')
    return df

我应该如何重写一个更高效的函数?

如果 intervals 不是很大,我会尝试创建一个自定义函数 is_in_interval(t)。您可以按 timestamp_start 对区间进行初步排序,然后使用二进制搜索来查找区间(如果有的话)。此外,我不会加入数据集,而是创建一个 UDF。像这样:

import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType

def filter_intervals(sorted_intervals, df):

  def is_in_interval(t):
    ...

  is_in_interval_udf = F.udf(is_valid_id, BooleanType())

  return df.filter(is_in_interval_udf("timestamp"))

在这种情况下,您不必广播 intervals,因为每个执行程序上都有 intervals 的本地副本。但是同样,只有当 intervals 不是太大并且适合执行者的内存时,这才会有效。

这是 post 关于 Python 中的二进制搜索的文章: Binary search (bisection) in Python