Spark:如何根据两列将实例拆分为Postivie/Negative个样本

Spark: How to split an instance into Postivie/Negative Samples according to two columns

我有以下数据框:

df = sc.parallelize([(1, 2, 3, '2','1','1'), (4, 5, 6, '3','2','1')]).toDF(['ID1', 'ID2', 'ID3','Impressions','Clicks','ImpressionsMinusClicks'])
df.show()

我想将它转换成这个(但是不知道如何以及是否应用 split()explode() 来实现这个):

这里的关键是基本上复制每个实例以匹配# of Impressions(例如10个印象实例变成10行),然后在这些行中标记它们#点击次数作为正例,其余行标记#印象 - 作为负面例子的点击次数。总结一下:一个实例有 10 次展示和 3 次点击。我想将它转换成 10 行,3 个正样本(“1”代表点击)和 7 个负样本(“0”代表 impressed/not 点击)。目的是将其用于输入分类模型,例如朴素贝叶斯或逻辑回归。这个的出处是Kaggle KDD Cup 2012数据集。

您确实可以在 UDF 的结果上使用 explode,它会产生一系列 "events" - 1 表示点击事件,0 表示非点击印象事件:

// We create a UDF which expects two columns (imps and clicks) as input, 
// and returns an array of "is clicked" (0 or 1) integers
val toClickedEvents = udf[Array[Int], Int, Int] {
  case (imps, clicks) => {
    // First, we map the number of imps (e.g. 3) into a sequence
    // of "imps" indices starting from zero; Each one would later
    // represent a single impression "event"
    val impsIndices = (0 until imps)

    // we map each impression "event", represented by its index, 
    // into a 1 or a 0: depending if that event had a matching click;
    // we do that by assigning "1" to indices lower than the number of clicks
    // and "0" for the rest
    val clickIndicatorPerImp = impsIndices.map(index => if (clicks > index) 1 else 0)

    // finally we just convert into an array, to comply with the UDF signature
    clickIndicatorPerImp.toArray
  }
}

// explode the result of the UDF and calculate ImpressedNotClicked
df.withColumn("Clicked", explode(toClickedEvents($"Impressions", $"Clicks")))
  .select($"ID1", $"ID2", $"ID3", $"Clicked", abs($"Clicked" - lit(1)) as "ImpressedNotClicked")

注意:原始 post 被标记为 scala;如果您可以将其转换为python,请随时编辑

根据@TzachZohar 提供的答案和帮助,我找到了一个适用于 PySpark 的有效解决方案 "ugly"。如果您有更优雅的方式,请告诉我或post回答或评论!

    # The first UDF we create is to map the number of impressions into a sequence
    impsIndexed = udf(lambda Impressions: range(0,Impressions), ArrayType(IntegerType()))

    # The second UDF is to map each impression into a 1 or 0 depending if that event had a matching click. So far this is equivalent to Tzach's solution, with the exception that I was not able to package both operations into a single UDF.
    IndicesToClicks = udf(lambda Impressed, Clicks: 1 if Impressed < Clicks else 0, IntegerType())

    # Next, I take the input data frame and apply the first UDF to it, creating a new data frame.
    df_new = df.withColumn('Impressed', explode(impsInd('Impressions')))
    df_new.show()

    # Lastly, I take this new data frame and apply the second UDF to it, achieving the final data frame.
    df_fin = df_new.withColumn('Clicked', toClicks('Impressed','Clicks')).select('ID1','ID2','ID3','Clicked')
    df_fin.show()