如何有条件地规范化 PySpark 中 RDD 列中的值?
How to conditionally normalize values in a RDD Column in PySpark?
我想做什么:
将输入df1转换为所需的输出df2。
我正在尝试使用 python、 在 Spark DataFrame 中的列中 规范化值 当行的值仅高于某个阈值时 .
我有什么 (df1) :
df1 = spark.createDataFrame([ ('A',50,80),
('B',110,90),
('C',150,130),
('D',230,280)
], ["item","X","Y"])
我想要的 (df2) :
df2 = spark.createDataFrame([ ('A',50,80),
('B',107.7,90),
('C',138.5,116.7),
('D',200,200)
], ["item","X","Y"])
到目前为止我做到了:
同时使用以下规范化逻辑:
for Xi > 100
Wi = ( Xi - Min_X) / (Max_X - Min_X) * 100 + Min_X
where Min_X = 100 and Max_X = Max(X) (in that case 230)
else Wi = Xi
和
for Yi > 100
Zi = ( Xi - Min_Y) / (Max_Y - Min_Y) * 100 + Min_Y
where Min_Y = 100 and Max_Y = Max(Y) (in that case 280)
else Zi = Yi
也就是说,每列的归一化应该不同(因为最大值可能因列而异)
我尝试使用以下代码,但遇到结构类型冲突。
Min_X = 100
Max_X = df.select(max(df.X)).alias('max').collect()
df = df.withColumn("X", when(col("X")>100, F.round((col("X") - Min_X)/(Max_X - Min_X),2)).otherwise(col("X")))
TypeError: unsupported operand type(s) for -: 'list' and 'int'
备注:
- 结果可以替换给定列中的当前值
或填充到新列中。
- 如果可能的话,解决方案应该
为 1 或 N 列工作
- 出于性能原因,如果可能,解决方案应坚持
pyspark 库。
提前感谢您的宝贵时间和反馈!
错误是因为 Min_X 是 Int 而 Max_X 是 Row 类型。
试试这个
>>> df1 = spark.createDataFrame([ ('A',50,80), ('B',110,90), ('C',150,130), ('D',230,280)], ["item","X","Y"])
>>> df1.show()
+----+---+---+
|item| X| Y|
+----+---+---+
| A| 50| 80|
| B|110| 90|
| C|150|130|
| D|230|280|
+----+---+---+
>>> Min_X = 100
>>> Max_X = df1.select(max(df1.X)).alias('max').collect()
>>> Max_X
[Row(max(X)=230)]
>>> Max_X = Max_X[0][0]
>>> Max_X
230
>>> df = df1.withColumn("X", when(col("X")>100, round((col("X") - Min_X)/(Max_X - Min_X),2)).otherwise(col("X")))
>>> df.show()
+----+----+---+
|item| X| Y|
+----+----+---+
| A|50.0| 80|
| B|0.08| 90|
| C|0.38|130|
| D| 1.0|280|
+----+----+---+
我想做什么:
将输入df1转换为所需的输出df2。 我正在尝试使用 python、 在 Spark DataFrame 中的列中 规范化值 当行的值仅高于某个阈值时 .
我有什么 (df1) :
df1 = spark.createDataFrame([ ('A',50,80),
('B',110,90),
('C',150,130),
('D',230,280)
], ["item","X","Y"])
我想要的 (df2) :
df2 = spark.createDataFrame([ ('A',50,80),
('B',107.7,90),
('C',138.5,116.7),
('D',200,200)
], ["item","X","Y"])
到目前为止我做到了:
同时使用以下规范化逻辑:
for Xi > 100
Wi = ( Xi - Min_X) / (Max_X - Min_X) * 100 + Min_X
where Min_X = 100 and Max_X = Max(X) (in that case 230)
else Wi = Xi
和
for Yi > 100
Zi = ( Xi - Min_Y) / (Max_Y - Min_Y) * 100 + Min_Y
where Min_Y = 100 and Max_Y = Max(Y) (in that case 280)
else Zi = Yi
也就是说,每列的归一化应该不同(因为最大值可能因列而异)
我尝试使用以下代码,但遇到结构类型冲突。
Min_X = 100
Max_X = df.select(max(df.X)).alias('max').collect()
df = df.withColumn("X", when(col("X")>100, F.round((col("X") - Min_X)/(Max_X - Min_X),2)).otherwise(col("X")))
TypeError: unsupported operand type(s) for -: 'list' and 'int'
备注:
- 结果可以替换给定列中的当前值 或填充到新列中。
- 如果可能的话,解决方案应该 为 1 或 N 列工作
- 出于性能原因,如果可能,解决方案应坚持 pyspark 库。
提前感谢您的宝贵时间和反馈!
错误是因为 Min_X 是 Int 而 Max_X 是 Row 类型。
试试这个
>>> df1 = spark.createDataFrame([ ('A',50,80), ('B',110,90), ('C',150,130), ('D',230,280)], ["item","X","Y"])
>>> df1.show()
+----+---+---+
|item| X| Y|
+----+---+---+
| A| 50| 80|
| B|110| 90|
| C|150|130|
| D|230|280|
+----+---+---+
>>> Min_X = 100
>>> Max_X = df1.select(max(df1.X)).alias('max').collect()
>>> Max_X
[Row(max(X)=230)]
>>> Max_X = Max_X[0][0]
>>> Max_X
230
>>> df = df1.withColumn("X", when(col("X")>100, round((col("X") - Min_X)/(Max_X - Min_X),2)).otherwise(col("X")))
>>> df.show()
+----+----+---+
|item| X| Y|
+----+----+---+
| A|50.0| 80|
| B|0.08| 90|
| C|0.38|130|
| D| 1.0|280|
+----+----+---+