在一列中随机 select x(x 是固定的) 值,并在 pyspark 数据框中用 0 替换它
Randomly select x(x is fixed) values in a column and replace it with 0 in pyspark dataframe
我想将 table 中的 2 个客户值随机替换为 0。table 可以有尽可能多的行,但 table 的主键是客户。
我如何在 pyspark 中执行此操作?
样本:
输入
| cust | value|
| c1 | 2 |
| c2 | 4 |
| c4 | 6 |
| c3 | 8 |
| c5 | 10 |
| c6 | 12 |
输出
| cust | value|
| c1 | 2 |
| c2 | 0 |
| c4 | 6 |
| c3 | 8 |
| c5 | 0 |
| c6 | 12 |
你可以使用一个技巧:用row_number
定义一个索引列,然后根据这个新列过滤N行。
import random
import pyspark.sql.functions as F
from pyspark.sql.window import Window
def replace_n_values_with_zeroes(df, n=2, col_name='value', seed=42):
# retrieve number of rows
n_rows = df.count()
# take n random values from idx
random.seed(seed)
idx_rows = random.sample(list(range(1, n_rows+1)), n)
# define index column
w = Window.orderBy(F.lit(1))
df = df.withColumn('idx', F.row_number().over(w))
# replace some values with zeroes
df = df.withColumn(col_name, F.when(F.col('idx').isin(idx_rows), 0)
.otherwise(F.col(col_name)))
df = df.drop('idx')
return df
例子
df = spark.createDataFrame([
["c1", 2],
["c2", 4],
["c4", 6],
["c3", 8],
["c5", 10],
["c6", 12]
], ["cust", "value"])
replace_n_values_with_zeroes(df).show()
+----+-----+
|cust|value|
+----+-----+
| c1| 0|
| c2| 4|
| c4| 6|
| c3| 8|
| c5| 10|
| c6| 0|
+----+-----+
我想将 table 中的 2 个客户值随机替换为 0。table 可以有尽可能多的行,但 table 的主键是客户。
我如何在 pyspark 中执行此操作?
样本:
输入
| cust | value|
| c1 | 2 |
| c2 | 4 |
| c4 | 6 |
| c3 | 8 |
| c5 | 10 |
| c6 | 12 |
输出
| cust | value|
| c1 | 2 |
| c2 | 0 |
| c4 | 6 |
| c3 | 8 |
| c5 | 0 |
| c6 | 12 |
你可以使用一个技巧:用row_number
定义一个索引列,然后根据这个新列过滤N行。
import random
import pyspark.sql.functions as F
from pyspark.sql.window import Window
def replace_n_values_with_zeroes(df, n=2, col_name='value', seed=42):
# retrieve number of rows
n_rows = df.count()
# take n random values from idx
random.seed(seed)
idx_rows = random.sample(list(range(1, n_rows+1)), n)
# define index column
w = Window.orderBy(F.lit(1))
df = df.withColumn('idx', F.row_number().over(w))
# replace some values with zeroes
df = df.withColumn(col_name, F.when(F.col('idx').isin(idx_rows), 0)
.otherwise(F.col(col_name)))
df = df.drop('idx')
return df
例子
df = spark.createDataFrame([
["c1", 2],
["c2", 4],
["c4", 6],
["c3", 8],
["c5", 10],
["c6", 12]
], ["cust", "value"])
replace_n_values_with_zeroes(df).show()
+----+-----+
|cust|value|
+----+-----+
| c1| 0|
| c2| 4|
| c4| 6|
| c3| 8|
| c5| 10|
| c6| 0|
+----+-----+