Spark:如何根据两列将实例拆分为Postivie/Negative个样本
Spark: How to split an instance into Postivie/Negative Samples according to two columns
我有以下数据框:
df = sc.parallelize([(1, 2, 3, '2','1','1'), (4, 5, 6, '3','2','1')]).toDF(['ID1', 'ID2', 'ID3','Impressions','Clicks','ImpressionsMinusClicks'])
df.show()
我想将它转换成这个(但是不知道如何以及是否应用 split()
和 explode()
来实现这个):
这里的关键是基本上复制每个实例以匹配# of Impressions(例如10个印象实例变成10行),然后在这些行中标记它们#点击次数作为正例,其余行标记#印象 - 作为负面例子的点击次数。总结一下:一个实例有 10 次展示和 3 次点击。我想将它转换成 10 行,3 个正样本(“1”代表点击)和 7 个负样本(“0”代表 impressed/not 点击)。目的是将其用于输入分类模型,例如朴素贝叶斯或逻辑回归。这个的出处是Kaggle KDD Cup 2012数据集。
您确实可以在 UDF 的结果上使用 explode
,它会产生一系列 "events" - 1 表示点击事件,0 表示非点击印象事件:
// We create a UDF which expects two columns (imps and clicks) as input,
// and returns an array of "is clicked" (0 or 1) integers
val toClickedEvents = udf[Array[Int], Int, Int] {
case (imps, clicks) => {
// First, we map the number of imps (e.g. 3) into a sequence
// of "imps" indices starting from zero; Each one would later
// represent a single impression "event"
val impsIndices = (0 until imps)
// we map each impression "event", represented by its index,
// into a 1 or a 0: depending if that event had a matching click;
// we do that by assigning "1" to indices lower than the number of clicks
// and "0" for the rest
val clickIndicatorPerImp = impsIndices.map(index => if (clicks > index) 1 else 0)
// finally we just convert into an array, to comply with the UDF signature
clickIndicatorPerImp.toArray
}
}
// explode the result of the UDF and calculate ImpressedNotClicked
df.withColumn("Clicked", explode(toClickedEvents($"Impressions", $"Clicks")))
.select($"ID1", $"ID2", $"ID3", $"Clicked", abs($"Clicked" - lit(1)) as "ImpressedNotClicked")
注意:原始 post 被标记为 scala
;如果您可以将其转换为python
,请随时编辑
根据@TzachZohar 提供的答案和帮助,我找到了一个适用于 PySpark 的有效解决方案 "ugly"。如果您有更优雅的方式,请告诉我或post回答或评论!
# The first UDF we create is to map the number of impressions into a sequence
impsIndexed = udf(lambda Impressions: range(0,Impressions), ArrayType(IntegerType()))
# The second UDF is to map each impression into a 1 or 0 depending if that event had a matching click. So far this is equivalent to Tzach's solution, with the exception that I was not able to package both operations into a single UDF.
IndicesToClicks = udf(lambda Impressed, Clicks: 1 if Impressed < Clicks else 0, IntegerType())
# Next, I take the input data frame and apply the first UDF to it, creating a new data frame.
df_new = df.withColumn('Impressed', explode(impsInd('Impressions')))
df_new.show()
# Lastly, I take this new data frame and apply the second UDF to it, achieving the final data frame.
df_fin = df_new.withColumn('Clicked', toClicks('Impressed','Clicks')).select('ID1','ID2','ID3','Clicked')
df_fin.show()
我有以下数据框:
df = sc.parallelize([(1, 2, 3, '2','1','1'), (4, 5, 6, '3','2','1')]).toDF(['ID1', 'ID2', 'ID3','Impressions','Clicks','ImpressionsMinusClicks'])
df.show()
我想将它转换成这个(但是不知道如何以及是否应用 split()
和 explode()
来实现这个):
这里的关键是基本上复制每个实例以匹配# of Impressions(例如10个印象实例变成10行),然后在这些行中标记它们#点击次数作为正例,其余行标记#印象 - 作为负面例子的点击次数。总结一下:一个实例有 10 次展示和 3 次点击。我想将它转换成 10 行,3 个正样本(“1”代表点击)和 7 个负样本(“0”代表 impressed/not 点击)。目的是将其用于输入分类模型,例如朴素贝叶斯或逻辑回归。这个的出处是Kaggle KDD Cup 2012数据集。
您确实可以在 UDF 的结果上使用 explode
,它会产生一系列 "events" - 1 表示点击事件,0 表示非点击印象事件:
// We create a UDF which expects two columns (imps and clicks) as input,
// and returns an array of "is clicked" (0 or 1) integers
val toClickedEvents = udf[Array[Int], Int, Int] {
case (imps, clicks) => {
// First, we map the number of imps (e.g. 3) into a sequence
// of "imps" indices starting from zero; Each one would later
// represent a single impression "event"
val impsIndices = (0 until imps)
// we map each impression "event", represented by its index,
// into a 1 or a 0: depending if that event had a matching click;
// we do that by assigning "1" to indices lower than the number of clicks
// and "0" for the rest
val clickIndicatorPerImp = impsIndices.map(index => if (clicks > index) 1 else 0)
// finally we just convert into an array, to comply with the UDF signature
clickIndicatorPerImp.toArray
}
}
// explode the result of the UDF and calculate ImpressedNotClicked
df.withColumn("Clicked", explode(toClickedEvents($"Impressions", $"Clicks")))
.select($"ID1", $"ID2", $"ID3", $"Clicked", abs($"Clicked" - lit(1)) as "ImpressedNotClicked")
注意:原始 post 被标记为 scala
;如果您可以将其转换为python
,请随时编辑
根据@TzachZohar 提供的答案和帮助,我找到了一个适用于 PySpark 的有效解决方案 "ugly"。如果您有更优雅的方式,请告诉我或post回答或评论!
# The first UDF we create is to map the number of impressions into a sequence
impsIndexed = udf(lambda Impressions: range(0,Impressions), ArrayType(IntegerType()))
# The second UDF is to map each impression into a 1 or 0 depending if that event had a matching click. So far this is equivalent to Tzach's solution, with the exception that I was not able to package both operations into a single UDF.
IndicesToClicks = udf(lambda Impressed, Clicks: 1 if Impressed < Clicks else 0, IntegerType())
# Next, I take the input data frame and apply the first UDF to it, creating a new data frame.
df_new = df.withColumn('Impressed', explode(impsInd('Impressions')))
df_new.show()
# Lastly, I take this new data frame and apply the second UDF to it, achieving the final data frame.
df_fin = df_new.withColumn('Clicked', toClicks('Impressed','Clicks')).select('ID1','ID2','ID3','Clicked')
df_fin.show()