在 PySpark UDF 中使用不同的数据框
Use different dataframe inside PySpark UDF
我得到了一个数据框 (df1
),我在其中列出了一些时间范围:
| start | end | event name |
|-------|-----|------------|
| 1 | 3 | name_1 |
| 3 | 5 | name_2 |
| 2 | 6 | name_3 |
在这些时间范围内,我想从另一个数据帧 (df2
) 中提取一些数据。例如,我想用指定时间范围内 df2
的平均测量值扩展 df1
。
| timestamp | measurement |
|-----------|-------------|
| 1 | 5 |
| 2 | 7 |
| 3 | 5 |
| 4 | 9 |
| 5 | 2 |
| 6 | 7 |
| 7 | 8 |
我在考虑一个 UDF 函数,它按时间戳过滤 df2
并计算平均值。但是在 UDF 中我不能引用两个数据帧:
def get_avg(start, end):
return df2.filter(df2.timestamp > start & df2.timestamp < end).agg({"average": "avg"})
udf_1 = f.udf(get_avg)
df1.select(udf_1('start', 'end').show()
这将引发错误 TypeError: cannot pickle '_thread.RLock' object
。
我该如何有效地解决这个问题?
在这种情况下,无需使用 UDF,您只需在时间戳
确定的范围内使用 join
import pyspark.sql.functions as F
df1.join(df2, on=[(df2.timestamp > df1.start) & (df2.timestamp < df1.end)]) \
.groupby('start', 'end', 'event_name') \
.agg(F.mean('measurement').alias('avg')) \
.show()
+-----+---+----------+-----------------+
|start|end|event_name| avg|
+-----+---+----------+-----------------+
| 1| 3| name_1| 7.0|
| 3| 5| name_2| 9.0|
| 2| 6| name_3|5.333333333333333|
+-----+---+----------+-----------------+
我得到了一个数据框 (df1
),我在其中列出了一些时间范围:
| start | end | event name |
|-------|-----|------------|
| 1 | 3 | name_1 |
| 3 | 5 | name_2 |
| 2 | 6 | name_3 |
在这些时间范围内,我想从另一个数据帧 (df2
) 中提取一些数据。例如,我想用指定时间范围内 df2
的平均测量值扩展 df1
。
| timestamp | measurement |
|-----------|-------------|
| 1 | 5 |
| 2 | 7 |
| 3 | 5 |
| 4 | 9 |
| 5 | 2 |
| 6 | 7 |
| 7 | 8 |
我在考虑一个 UDF 函数,它按时间戳过滤 df2
并计算平均值。但是在 UDF 中我不能引用两个数据帧:
def get_avg(start, end):
return df2.filter(df2.timestamp > start & df2.timestamp < end).agg({"average": "avg"})
udf_1 = f.udf(get_avg)
df1.select(udf_1('start', 'end').show()
这将引发错误 TypeError: cannot pickle '_thread.RLock' object
。
我该如何有效地解决这个问题?
在这种情况下,无需使用 UDF,您只需在时间戳
确定的范围内使用join
import pyspark.sql.functions as F
df1.join(df2, on=[(df2.timestamp > df1.start) & (df2.timestamp < df1.end)]) \
.groupby('start', 'end', 'event_name') \
.agg(F.mean('measurement').alias('avg')) \
.show()
+-----+---+----------+-----------------+
|start|end|event_name| avg|
+-----+---+----------+-----------------+
| 1| 3| name_1| 7.0|
| 3| 5| name_2| 9.0|
| 2| 6| name_3|5.333333333333333|
+-----+---+----------+-----------------+