有没有办法在没有 Spark UDF 的情况下混合两个具有不同支持的分布数据帧?
Is there a way, without Spark UDFs, to blend two distribution DataFrames that have different support?
我有两个 DataFrame
,每个代表一个概率分布,但每行存储一个条目。例如一个是 df1
:
item_id | probability
--------|---------------
item1 | 0.1
item2 | 0.2
item3 | 0.7
还有一个,我们称它为df2
:
item_id | probability
--------|---------------
item2 | 0.3
item3 | 0.5
item4 | 0.2
请注意,这两个项目 space 是不同的。但这没关系,因为这意味着 df1
对 item4
的概率为零,而 df2
对 item1
的概率为零。我想要的是不大量使用自定义 UDF 的代码,它基本上会产生一个 DataFrame
,给定一些 alpha
双精度值,混合这两个分布。我可以使用自定义 UDF 编写此代码,但我想知道是否有一些纯基于 Spark SQL 的代码仅使用内置函数来执行此操作。
item_id | probability
--------|---------------
item1 | 0.1 * alpha + 0.0 * (1 - alpha)
item2 | 0.2 * alpha + 0.3 * (1 - alpha)
item3 | 0.7 * alpha + 0.5 * (1 - alpha)
item4 | 0.0 * alpha + 0.2 * (1 - alpha)
我认为 SQL 完全可以做到。秘诀是“外部”连接和一些合并魔法来处理空值。
import org.apache.spark.sql.types._
val someData = Seq( ("item1", 0.1 ), ("item2", 0.2),("item3", 0.7) )
val alpha = 1.1
val someMoreData = Seq( ("item2", 0.3 ), ("item3", 0.5),("item4", 0.2) )
val df1 = spark.sparkContext.parallelize(someMoreData).toDF( "item_id","probability" )
val df2 = spark.sparkContext.parallelize(someData).toDF( "item_id","probability" )
val prob = df2
.join(df1, df1("item_id") === df2("item_id"), "outer" )
.select(
coalesce( df1("item_id"), df2("item_id") ).alias("item_id"),
coalesce( df1("probability"),lit(0.0)).alias("probability1"),
coalesce( df2("probability"),lit(0.0)).alias("probability2"),
lit(alhpa).alias("alpha") )
prob.show()
+-------+------------+------------+-----+
|item_id|probability1|probability2|alpha|
+-------+------------+------------+-----+
| item3| 0.5| 0.7| 1.1|
| item2| 0.3| 0.2| 1.1|
| item1| 0.0| 0.1| 1.1|
| item4| 0.2| 0.0| 1.1|
+-------+------------+------------+-----+
prob.select( prob("probability1") * prob("alpha") + prob("probability2").multiply( lit(1.0) - prob("alpha")), prob("item_id") ).show();
+---------------------------------------------------------+-------+
|((probability1 * alpha) + (probability2 * (1.0 - alpha)))|item_id|
+---------------------------------------------------------+-------+
| 0.48| item3|
| 0.31| item2|
| -0.01000000000000...| item1|
| 0.22000000000000003| item4|
+---------------------------------------------------------+-------+
一个full-join即可解决,操作简单。这里有一个片段。请记住,为了澄清起见,我将两个数据框中的 prob
名称分别更改为 prob1
和 prob2
。然后它就像:
alpha = 0.5
df = df1 \
.join(df2, how='full', on='item_id') \
.fillna(0) \
.withColumn('prob', alpha * F.col('prob1') + (1-alpha) * F.col('prob2'))
您的数据样本:
df1 = spark.createDataFrame(data=[
('item1', 0.1),
('item2', 0.2),
('item3', 0.7)
], schema=['item_id', 'prob1'])
df2 = spark.createDataFrame(data=[
('item2', 0.3),
('item3', 0.5),
('item4', 0.2)
], schema=['item_id', 'prob2'])
我有两个 DataFrame
,每个代表一个概率分布,但每行存储一个条目。例如一个是 df1
:
item_id | probability
--------|---------------
item1 | 0.1
item2 | 0.2
item3 | 0.7
还有一个,我们称它为df2
:
item_id | probability
--------|---------------
item2 | 0.3
item3 | 0.5
item4 | 0.2
请注意,这两个项目 space 是不同的。但这没关系,因为这意味着 df1
对 item4
的概率为零,而 df2
对 item1
的概率为零。我想要的是不大量使用自定义 UDF 的代码,它基本上会产生一个 DataFrame
,给定一些 alpha
双精度值,混合这两个分布。我可以使用自定义 UDF 编写此代码,但我想知道是否有一些纯基于 Spark SQL 的代码仅使用内置函数来执行此操作。
item_id | probability
--------|---------------
item1 | 0.1 * alpha + 0.0 * (1 - alpha)
item2 | 0.2 * alpha + 0.3 * (1 - alpha)
item3 | 0.7 * alpha + 0.5 * (1 - alpha)
item4 | 0.0 * alpha + 0.2 * (1 - alpha)
我认为 SQL 完全可以做到。秘诀是“外部”连接和一些合并魔法来处理空值。
import org.apache.spark.sql.types._
val someData = Seq( ("item1", 0.1 ), ("item2", 0.2),("item3", 0.7) )
val alpha = 1.1
val someMoreData = Seq( ("item2", 0.3 ), ("item3", 0.5),("item4", 0.2) )
val df1 = spark.sparkContext.parallelize(someMoreData).toDF( "item_id","probability" )
val df2 = spark.sparkContext.parallelize(someData).toDF( "item_id","probability" )
val prob = df2
.join(df1, df1("item_id") === df2("item_id"), "outer" )
.select(
coalesce( df1("item_id"), df2("item_id") ).alias("item_id"),
coalesce( df1("probability"),lit(0.0)).alias("probability1"),
coalesce( df2("probability"),lit(0.0)).alias("probability2"),
lit(alhpa).alias("alpha") )
prob.show()
+-------+------------+------------+-----+
|item_id|probability1|probability2|alpha|
+-------+------------+------------+-----+
| item3| 0.5| 0.7| 1.1|
| item2| 0.3| 0.2| 1.1|
| item1| 0.0| 0.1| 1.1|
| item4| 0.2| 0.0| 1.1|
+-------+------------+------------+-----+
prob.select( prob("probability1") * prob("alpha") + prob("probability2").multiply( lit(1.0) - prob("alpha")), prob("item_id") ).show();
+---------------------------------------------------------+-------+
|((probability1 * alpha) + (probability2 * (1.0 - alpha)))|item_id|
+---------------------------------------------------------+-------+
| 0.48| item3|
| 0.31| item2|
| -0.01000000000000...| item1|
| 0.22000000000000003| item4|
+---------------------------------------------------------+-------+
一个full-join即可解决,操作简单。这里有一个片段。请记住,为了澄清起见,我将两个数据框中的 prob
名称分别更改为 prob1
和 prob2
。然后它就像:
alpha = 0.5
df = df1 \
.join(df2, how='full', on='item_id') \
.fillna(0) \
.withColumn('prob', alpha * F.col('prob1') + (1-alpha) * F.col('prob2'))
您的数据样本:
df1 = spark.createDataFrame(data=[
('item1', 0.1),
('item2', 0.2),
('item3', 0.7)
], schema=['item_id', 'prob1'])
df2 = spark.createDataFrame(data=[
('item2', 0.3),
('item3', 0.5),
('item4', 0.2)
], schema=['item_id', 'prob2'])