在 PySpark UDF 中使用不同的数据框

Use different dataframe inside PySpark UDF

我得到了一个数据框 (df1),我在其中列出了一些时间范围:

| start | end | event name |
|-------|-----|------------|
| 1     | 3   | name_1     |
| 3     | 5   | name_2     |
| 2     | 6   | name_3     |

在这些时间范围内,我想从另一个数据帧 (df2) 中提取一些数据。例如,我想用指定时间范围内 df2 的平均测量值扩展 df1

| timestamp | measurement |
|-----------|-------------|
| 1         | 5           |
| 2         | 7           |
| 3         | 5           |
| 4         | 9           |
| 5         | 2           |
| 6         | 7           |
| 7         | 8           |

我在考虑一个 UDF 函数,它按时间戳过滤 df2 并计算平均值。但是在 UDF 中我不能引用两个数据帧:

def get_avg(start, end):
  return df2.filter(df2.timestamp > start & df2.timestamp < end).agg({"average": "avg"})

udf_1 = f.udf(get_avg)

df1.select(udf_1('start', 'end').show()

这将引发错误 TypeError: cannot pickle '_thread.RLock' object

我该如何有效地解决这个问题?

在这种情况下,无需使用 UDF,您只需在时间戳

确定的范围内使用 join
import pyspark.sql.functions as F

df1.join(df2, on=[(df2.timestamp > df1.start) & (df2.timestamp < df1.end)]) \
  .groupby('start', 'end', 'event_name') \
  .agg(F.mean('measurement').alias('avg')) \
  .show()

+-----+---+----------+-----------------+
|start|end|event_name|              avg|
+-----+---+----------+-----------------+
|    1|  3|    name_1|              7.0|
|    3|  5|    name_2|              9.0|
|    2|  6|    name_3|5.333333333333333|
+-----+---+----------+-----------------+