如何有条件地规范化 PySpark 中 RDD 列中的值?

How to conditionally normalize values in a RDD Column in PySpark?

我想做什么:

输入df1转换为所需的输出df2。 我正在尝试使用 python、 在 Spark DataFrame 中的列中 规范化值 当行的值仅高于某个阈值时 .


我有什么 (df1) :

df1 = spark.createDataFrame([ ('A',50,80),
                         ('B',110,90),
                         ('C',150,130),
                         ('D',230,280)
                       ], ["item","X","Y"])

我想要的 (df2) :

df2 = spark.createDataFrame([ ('A',50,80),
                         ('B',107.7,90),
                         ('C',138.5,116.7),
                         ('D',200,200)
                       ], ["item","X","Y"])

到目前为止我做到了:

同时使用以下规范化逻辑:

for Xi > 100
Wi = ( Xi - Min_X) / (Max_X - Min_X) * 100 + Min_X
where Min_X = 100 and Max_X = Max(X) (in that case 230)
else Wi = Xi

for Yi > 100
Zi = ( Xi - Min_Y) / (Max_Y - Min_Y) * 100 + Min_Y
where Min_Y = 100 and Max_Y = Max(Y) (in that case 280)
else Zi = Yi

也就是说,每列的归一化应该不同(因为最大值可能因列而异)

我尝试使用以下代码,但遇到结构类型冲突。

Min_X = 100
Max_X = df.select(max(df.X)).alias('max').collect()
df = df.withColumn("X", when(col("X")>100, F.round((col("X") - Min_X)/(Max_X - Min_X),2)).otherwise(col("X")))

TypeError: unsupported operand type(s) for -: 'list' and 'int'

备注:


提前感谢您的宝贵时间和反馈!


错误是因为 Min_X 是 Int 而 Max_X 是 Row 类型。

试试这个

>>> df1 = spark.createDataFrame([ ('A',50,80), ('B',110,90), ('C',150,130), ('D',230,280)], ["item","X","Y"])
>>> df1.show()
+----+---+---+
|item|  X|  Y|
+----+---+---+
|   A| 50| 80|
|   B|110| 90|
|   C|150|130|
|   D|230|280|
+----+---+---+

>>> Min_X = 100
>>> Max_X = df1.select(max(df1.X)).alias('max').collect()
>>> Max_X
[Row(max(X)=230)]
>>> Max_X = Max_X[0][0]
>>> Max_X
230
>>> df = df1.withColumn("X", when(col("X")>100, round((col("X") - Min_X)/(Max_X - Min_X),2)).otherwise(col("X")))
>>> df.show()
+----+----+---+
|item|   X|  Y|
+----+----+---+
|   A|50.0| 80|
|   B|0.08| 90|
|   C|0.38|130|
|   D| 1.0|280|
+----+----+---+