有没有办法在没有 Spark UDF 的情况下混合两个具有不同支持的分布数据帧?

Is there a way, without Spark UDFs, to blend two distribution DataFrames that have different support?

我有两个 DataFrame,每个代表一个概率分布,但每行存储一个条目。例如一个是 df1:

item_id |  probability
--------|---------------
  item1 |      0.1
  item2 |      0.2
  item3 |      0.7

还有一个,我们称它为df2:

item_id |  probability
--------|---------------
  item2 |      0.3
  item3 |      0.5
  item4 |      0.2

请注意,这两个项目 space 是不同的。但这没关系,因为这意味着 df1item4 的概率为零,而 df2item1 的概率为零。我想要的是不大量使用自定义 UDF 的代码,它基本上会产生一个 DataFrame,给定一些 alpha 双精度值,混合这两个分布。我可以使用自定义 UDF 编写此代码,但我想知道是否有一些纯基于 Spark SQL 的代码仅使用内置函数来执行此操作。

item_id |  probability
--------|---------------
  item1 |      0.1 * alpha + 0.0 * (1 - alpha)
  item2 |      0.2 * alpha + 0.3 * (1 - alpha)
  item3 |      0.7 * alpha + 0.5 * (1 - alpha)
  item4 |      0.0 * alpha + 0.2 * (1 - alpha)

我认为 SQL 完全可以做到。秘诀是“外部”连接和一些合并魔法来处理空值。

import org.apache.spark.sql.types._


val someData = Seq( ("item1", 0.1 ), ("item2", 0.2),("item3", 0.7) )  
val alpha = 1.1

val someMoreData = Seq( ("item2", 0.3 ), ("item3", 0.5),("item4", 0.2) )  

val df1 = spark.sparkContext.parallelize(someMoreData).toDF( "item_id","probability" )
val df2 = spark.sparkContext.parallelize(someData).toDF( "item_id","probability" )
val prob = df2
  .join(df1, df1("item_id") === df2("item_id"), "outer" )
  .select( 
    coalesce( df1("item_id"), df2("item_id") ).alias("item_id"), 
    coalesce( df1("probability"),lit(0.0)).alias("probability1"),
    coalesce( df2("probability"),lit(0.0)).alias("probability2"), 
    lit(alhpa).alias("alpha") )
prob.show()
+-------+------------+------------+-----+
|item_id|probability1|probability2|alpha|
+-------+------------+------------+-----+
|  item3|         0.5|         0.7|  1.1|
|  item2|         0.3|         0.2|  1.1|
|  item1|         0.0|         0.1|  1.1|
|  item4|         0.2|         0.0|  1.1|
+-------+------------+------------+-----+

prob.select( prob("probability1") * prob("alpha") +  prob("probability2").multiply( lit(1.0) - prob("alpha")), prob("item_id") ).show();
+---------------------------------------------------------+-------+
|((probability1 * alpha) + (probability2 * (1.0 - alpha)))|item_id|
+---------------------------------------------------------+-------+
|                                                     0.48|  item3|
|                                                     0.31|  item2|
|                                     -0.01000000000000...|  item1|
|                                      0.22000000000000003|  item4|
+---------------------------------------------------------+-------+

一个full-join即可解决,操作简单。这里有一个片段。请记住,为了澄清起见,我将两个数据框中的 prob 名称分别更改为 prob1prob2。然后它就像:

alpha = 0.5
df = df1 \
    .join(df2, how='full', on='item_id') \
    .fillna(0) \
    .withColumn('prob', alpha * F.col('prob1') + (1-alpha) * F.col('prob2'))

您的数据样本:

df1 = spark.createDataFrame(data=[
    ('item1', 0.1),
    ('item2', 0.2),
    ('item3', 0.7)
], schema=['item_id', 'prob1'])

df2 = spark.createDataFrame(data=[
    ('item2', 0.3),
    ('item3', 0.5),
    ('item4', 0.2)
], schema=['item_id', 'prob2'])