如何将 DataFrame 作为输入传递给 Spark UDF?
How to pass DataFrame as input to Spark UDF?
我有一个数据框,我想对每一行应用一个函数。此功能取决于其他数据帧。
简化示例。我有如下三个数据框:
df = sc.parallelize([
['a', 'b', 1],
['c', 'd', 3]
]).toDF(('feat1', 'feat2', 'value'))
df_other_1 = sc.parallelize([
['a', 0, 1, 0.0],
['a', 1, 3, 0.1],
['a', 3, 10, 1.0],
['c', 0, 10, 0.2],
['c', 10, 25, 0.5]
]).toDF(('feat1', 'lower', 'upper', 'score'))
df_other_2 = sc.parallelize([
['b', 0, 4, 0.1],
['b', 4, 20, 0.5],
['b', 20, 30, 1.0],
['d', 0, 5, 0.05],
['d', 5, 22, 0.9]
]).toDF(('feat1', 'lower', 'upper', 'score'))
对于 df
的每一行,我想从 df_other_1
和 df_other_2
中收集 feat1
和 feat2
的唯一上限值,即第一行,唯一值是 (1, 3, 10, 4, 20, 30)。然后,我会将它们排序为 (30, 20, 10, 4, 3, 1) 并添加到前面,第一个数字之上的一个数字。 df
会变成这样:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
['c', 'd', 3, [26, 25, 22, 10, 5]]
]).toDF(('feat1', 'feat2', 'value', 'lst'))
然后,对于 df
的每一行和 lst
的每个相应值,我想从两个 df_other_1
中计算 score
的总和和 df_other_2
,其中 lst
的每个值都在 upper
和 lower
范围内。我的目标是在总分高于某个阈值(例如 1.4)的每个 lst
中找到最低值。
下面是总分的计算方法。因此,对于 df
的第一行,lst
的第一个值是 31。在 df_other_1
中 feat1
,它在最高桶之上,因此它会得到一个分数共 1 个。df_other_2
相同。因此,总分将是 1+1 =2。对于值 10(同样是第一行),总分将为 1 + 0.5 = 1.5。
这就是 df
最后的样子:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))
实际上我正在寻找这些目标值 4
和 25
。中间步骤并不重要。
============================================= =============================
这是我到目前为止尝试过的方法:
def get_threshold_for_row(feat1, feat2, threshold):
this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
this_df_other_2 = df_other_2.filter(col('feat1') == feat2)
values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
values_feat_1.append(values_feat_1[-1] + 1)
values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
values_feat_2.append(values_feat_2[-1] + 1)
values = values_feat_1 + values_feat_2
values = list(set(values)) #Keep unique values
values.sort(reverse=True) #Sort from largest to smallest
df_1_score = df_2_score = 0
prev_value = 10000 #Any large number
prev_score = 10000
for value in values:
df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)
total_score = df_1_score + df_2_score
if total_score < threshold and prev_score >= threshold:
return prev_value
prev_score = total_score
prev_value = value
def is_dataframe_empty(df):
return len(df.take(1)) == 0
def get_score_for_key(scores_df, grouping_key, this_id, value):
if is_dataframe_empty(scores_df):
return 0.0
w = Window.partitionBy([grouping_key]).orderBy(col('upper'))
scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
.withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
.drop('prev_value')
scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
.withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
.drop('next_value').cache()
grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) &
(((value >= col('from_value')) & (value < col('to_value'))) |
((value >= col('to_value')) & (col('is_last') == 1)) |
((value < col('from_value')) & (col('is_first') == 1)) |
(col('from_value').isNull()))) \
.withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
.collect()[0]['final_score']
return grouping_key_score
df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
.map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
.toDF()
但我得到:
AttributeError: 'Py4JError' object has no attribute 'message'
抱歉这么久 post。有什么想法吗?
I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.
tl;dr 这在 UDF 中是不可能的。
在最广泛的意义上,UDF 是一个函数(实际上是一个 Catalyst 表达式),它接受零个或多个列值(作为列引用)。
如果 UDF 是用户定义的聚合函数 (UDAF),则 UDF 只能处理在最广泛的情况下可能是整个 DataFrame 的记录。
如果你想在一个 UDF 中处理多个 DataFrame,你必须 join
DataFrames 有你想用于 UDF 的列。
我有一个数据框,我想对每一行应用一个函数。此功能取决于其他数据帧。
简化示例。我有如下三个数据框:
df = sc.parallelize([
['a', 'b', 1],
['c', 'd', 3]
]).toDF(('feat1', 'feat2', 'value'))
df_other_1 = sc.parallelize([
['a', 0, 1, 0.0],
['a', 1, 3, 0.1],
['a', 3, 10, 1.0],
['c', 0, 10, 0.2],
['c', 10, 25, 0.5]
]).toDF(('feat1', 'lower', 'upper', 'score'))
df_other_2 = sc.parallelize([
['b', 0, 4, 0.1],
['b', 4, 20, 0.5],
['b', 20, 30, 1.0],
['d', 0, 5, 0.05],
['d', 5, 22, 0.9]
]).toDF(('feat1', 'lower', 'upper', 'score'))
对于 df
的每一行,我想从 df_other_1
和 df_other_2
中收集 feat1
和 feat2
的唯一上限值,即第一行,唯一值是 (1, 3, 10, 4, 20, 30)。然后,我会将它们排序为 (30, 20, 10, 4, 3, 1) 并添加到前面,第一个数字之上的一个数字。 df
会变成这样:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
['c', 'd', 3, [26, 25, 22, 10, 5]]
]).toDF(('feat1', 'feat2', 'value', 'lst'))
然后,对于 df
的每一行和 lst
的每个相应值,我想从两个 df_other_1
中计算 score
的总和和 df_other_2
,其中 lst
的每个值都在 upper
和 lower
范围内。我的目标是在总分高于某个阈值(例如 1.4)的每个 lst
中找到最低值。
下面是总分的计算方法。因此,对于 df
的第一行,lst
的第一个值是 31。在 df_other_1
中 feat1
,它在最高桶之上,因此它会得到一个分数共 1 个。df_other_2
相同。因此,总分将是 1+1 =2。对于值 10(同样是第一行),总分将为 1 + 0.5 = 1.5。
这就是 df
最后的样子:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))
实际上我正在寻找这些目标值 4
和 25
。中间步骤并不重要。
============================================= =============================
这是我到目前为止尝试过的方法:
def get_threshold_for_row(feat1, feat2, threshold):
this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
this_df_other_2 = df_other_2.filter(col('feat1') == feat2)
values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
values_feat_1.append(values_feat_1[-1] + 1)
values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
values_feat_2.append(values_feat_2[-1] + 1)
values = values_feat_1 + values_feat_2
values = list(set(values)) #Keep unique values
values.sort(reverse=True) #Sort from largest to smallest
df_1_score = df_2_score = 0
prev_value = 10000 #Any large number
prev_score = 10000
for value in values:
df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)
total_score = df_1_score + df_2_score
if total_score < threshold and prev_score >= threshold:
return prev_value
prev_score = total_score
prev_value = value
def is_dataframe_empty(df):
return len(df.take(1)) == 0
def get_score_for_key(scores_df, grouping_key, this_id, value):
if is_dataframe_empty(scores_df):
return 0.0
w = Window.partitionBy([grouping_key]).orderBy(col('upper'))
scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
.withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
.drop('prev_value')
scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
.withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
.drop('next_value').cache()
grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) &
(((value >= col('from_value')) & (value < col('to_value'))) |
((value >= col('to_value')) & (col('is_last') == 1)) |
((value < col('from_value')) & (col('is_first') == 1)) |
(col('from_value').isNull()))) \
.withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
.collect()[0]['final_score']
return grouping_key_score
df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
.map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
.toDF()
但我得到:
AttributeError: 'Py4JError' object has no attribute 'message'
抱歉这么久 post。有什么想法吗?
I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.
tl;dr 这在 UDF 中是不可能的。
在最广泛的意义上,UDF 是一个函数(实际上是一个 Catalyst 表达式),它接受零个或多个列值(作为列引用)。
如果 UDF 是用户定义的聚合函数 (UDAF),则 UDF 只能处理在最广泛的情况下可能是整个 DataFrame 的记录。
如果你想在一个 UDF 中处理多个 DataFrame,你必须 join
DataFrames 有你想用于 UDF 的列。