如何在 PySpark 中的多个时间间隔内使用 .filter() 操作?
How to use .filter() operation over multiple time intervals in PySpark?
我想优化以下功能。我正在使用广播内部连接,我认为它不够快。
我有一个具有以下属性的区间数据帧:timestamp_start、timestamp_end
以及一个时间序列数据帧元组,其属性为:时间戳、值。
函数然后 returns 属于区间之一的所有值:
def filter_intervals(intervals, df):
df = df.join(broadcast(intervals),
[df.timestamp >= intervals.timestamp_start,
df.timestamp <= intervals.timestamp_end],
how='inner')
return df
我应该如何重写一个更高效的函数?
如果 intervals
不是很大,我会尝试创建一个自定义函数 is_in_interval(t)
。您可以按 timestamp_start
对区间进行初步排序,然后使用二进制搜索来查找区间(如果有的话)。此外,我不会加入数据集,而是创建一个 UDF。像这样:
import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType
def filter_intervals(sorted_intervals, df):
def is_in_interval(t):
...
is_in_interval_udf = F.udf(is_valid_id, BooleanType())
return df.filter(is_in_interval_udf("timestamp"))
在这种情况下,您不必广播 intervals
,因为每个执行程序上都有 intervals
的本地副本。但是同样,只有当 intervals
不是太大并且适合执行者的内存时,这才会有效。
这是 post 关于 Python 中的二进制搜索的文章:
Binary search (bisection) in Python
我想优化以下功能。我正在使用广播内部连接,我认为它不够快。
我有一个具有以下属性的区间数据帧:timestamp_start、timestamp_end 以及一个时间序列数据帧元组,其属性为:时间戳、值。
函数然后 returns 属于区间之一的所有值:
def filter_intervals(intervals, df):
df = df.join(broadcast(intervals),
[df.timestamp >= intervals.timestamp_start,
df.timestamp <= intervals.timestamp_end],
how='inner')
return df
我应该如何重写一个更高效的函数?
如果 intervals
不是很大,我会尝试创建一个自定义函数 is_in_interval(t)
。您可以按 timestamp_start
对区间进行初步排序,然后使用二进制搜索来查找区间(如果有的话)。此外,我不会加入数据集,而是创建一个 UDF。像这样:
import pyspark.sql.functions as F
from pyspark.sql.types import BooleanType
def filter_intervals(sorted_intervals, df):
def is_in_interval(t):
...
is_in_interval_udf = F.udf(is_valid_id, BooleanType())
return df.filter(is_in_interval_udf("timestamp"))
在这种情况下,您不必广播 intervals
,因为每个执行程序上都有 intervals
的本地副本。但是同样,只有当 intervals
不是太大并且适合执行者的内存时,这才会有效。
这是 post 关于 Python 中的二进制搜索的文章: Binary search (bisection) in Python