如何按列值在pyspark df中添加更多行
How to add more rows in pyspark df by column value
我被这个问题困扰了很长一段时间,可能使它变得比实际情况更大。我会尽量简化它。
我在我的代码中使用了 pyspark 和数据框函数。
我已经有一个 df 为:
+--+-----+---------+
|id|col1 |col2 |
+--+-----+---------+
|1 |Hello|Repeat |
|2 |Word |Repeat |
|3 |Aux |No repeat|
|4 |Test |Repeat |
+--+-----+---------+
我想要实现的是当 col2 'Repeat' 增加 col1 在 value+1 中的值时重复 df 的行。
+--+-----+---------+------+
|id|col1 |col2 |col3 |
+--+-----+---------+------+
|1 |Hello|Repeat |Hello1|
|1 |Hello|Repeat |Hello2|
|1 |Hello|Repeat |Hello3|
|2 |Word |Repeat |Word1 |
|2 |Word |Repeat |Word2 |
|2 |Word |Repeat |Word3 |
|3 |Aux |No repeat|Aux |
|4 |Test |Repeat |Test1 |
|4 |Test |Repeat |Test2 |
|4 |Test |Repeat |Test3 |
+--+-----+---------+------+
我的第一个方法是使用 withColumn 运算符在 udf 的帮助下创建一个新列:
my_func = udf(lambda words: (words + str(i + 1 for i in range(3))), StringType())
df = df\
.withColumn('col3', when(col('col2') == 'No Repeat', col('col1'))
.otherwise(my_func(col('col1'))))
但是当我在 df.show(10,False) 中评估它时,它抛出了一个错误。我的猜测是因为我无法以这种方式使用 withColumn 函数创建更多行。
所以我决定采用另一种方法,但也没有成功。使用 rdd.flatMap:
test = df.rdd.flatMap(lambda row: (row if (row.col2== 'No Repeat') else (row.col1 + str(i+1) for i in range(3))))
print(test.collect())
但是这里我丢失了 df 模式 并且我不能在 else 条件下抛出整行,它 只抛出 col1 词加上它的迭代器.
你知道解决这个问题的正确方法吗?
最后我的问题是我没有找到一种正确的方法来根据列值创建更多行,因为我对这个世界还很陌生。还有我发现的答案似乎不适合这个问题。
我们将不胜感激。
一种方法是使用条件并分配一个数组,然后分解,
import pyspark.sql.functions as F
(df.withColumn("test",F.when(df['col2']=='Repeat',
F.array([F.lit(str(i)) for i in range(1,4)])).otherwise(F.array(F.lit(''))))
.withColumn("col3",F.explode(F.col("test"))).drop("test")
.withColumn("col3",F.concat(F.col("col1"),F.col("col3")))).show()
@MohammadMurtazaHashmi 建议的更整洁的版本如下所示:
(df.withColumn("test",F.when(df['col2']=='Repeat',
F.array([F.concat(F.col("col1"),F.lit(str(i))) for i in range(1,4)]))
.otherwise(F.array(F.col("col1"))))
.select("id","col1","col2", F.explode("test"))).show()
+---+-----+---------+------+
| id| col1| col2| col3|
+---+-----+---------+------+
| 1|Hello| Repeat|Hello1|
| 1|Hello| Repeat|Hello2|
| 1|Hello| Repeat|Hello3|
| 2| Word| Repeat| Word1|
| 2| Word| Repeat| Word2|
| 2| Word| Repeat| Word3|
| 3| Aux|No repeat| Aux|
| 4| Test| Repeat| Test1|
| 4| Test| Repeat| Test2|
| 4| Test| Repeat| Test3|
+---+-----+---------+------+
我被这个问题困扰了很长一段时间,可能使它变得比实际情况更大。我会尽量简化它。
我在我的代码中使用了 pyspark 和数据框函数。
我已经有一个 df 为:
+--+-----+---------+
|id|col1 |col2 |
+--+-----+---------+
|1 |Hello|Repeat |
|2 |Word |Repeat |
|3 |Aux |No repeat|
|4 |Test |Repeat |
+--+-----+---------+
我想要实现的是当 col2 'Repeat' 增加 col1 在 value+1 中的值时重复 df 的行。
+--+-----+---------+------+
|id|col1 |col2 |col3 |
+--+-----+---------+------+
|1 |Hello|Repeat |Hello1|
|1 |Hello|Repeat |Hello2|
|1 |Hello|Repeat |Hello3|
|2 |Word |Repeat |Word1 |
|2 |Word |Repeat |Word2 |
|2 |Word |Repeat |Word3 |
|3 |Aux |No repeat|Aux |
|4 |Test |Repeat |Test1 |
|4 |Test |Repeat |Test2 |
|4 |Test |Repeat |Test3 |
+--+-----+---------+------+
我的第一个方法是使用 withColumn 运算符在 udf 的帮助下创建一个新列:
my_func = udf(lambda words: (words + str(i + 1 for i in range(3))), StringType())
df = df\
.withColumn('col3', when(col('col2') == 'No Repeat', col('col1'))
.otherwise(my_func(col('col1'))))
但是当我在 df.show(10,False) 中评估它时,它抛出了一个错误。我的猜测是因为我无法以这种方式使用 withColumn 函数创建更多行。
所以我决定采用另一种方法,但也没有成功。使用 rdd.flatMap:
test = df.rdd.flatMap(lambda row: (row if (row.col2== 'No Repeat') else (row.col1 + str(i+1) for i in range(3))))
print(test.collect())
但是这里我丢失了 df 模式 并且我不能在 else 条件下抛出整行,它 只抛出 col1 词加上它的迭代器.
你知道解决这个问题的正确方法吗?
最后我的问题是我没有找到一种正确的方法来根据列值创建更多行,因为我对这个世界还很陌生。还有我发现的答案似乎不适合这个问题。
我们将不胜感激。
一种方法是使用条件并分配一个数组,然后分解,
import pyspark.sql.functions as F
(df.withColumn("test",F.when(df['col2']=='Repeat',
F.array([F.lit(str(i)) for i in range(1,4)])).otherwise(F.array(F.lit(''))))
.withColumn("col3",F.explode(F.col("test"))).drop("test")
.withColumn("col3",F.concat(F.col("col1"),F.col("col3")))).show()
@MohammadMurtazaHashmi 建议的更整洁的版本如下所示:
(df.withColumn("test",F.when(df['col2']=='Repeat',
F.array([F.concat(F.col("col1"),F.lit(str(i))) for i in range(1,4)]))
.otherwise(F.array(F.col("col1"))))
.select("id","col1","col2", F.explode("test"))).show()
+---+-----+---------+------+
| id| col1| col2| col3|
+---+-----+---------+------+
| 1|Hello| Repeat|Hello1|
| 1|Hello| Repeat|Hello2|
| 1|Hello| Repeat|Hello3|
| 2| Word| Repeat| Word1|
| 2| Word| Repeat| Word2|
| 2| Word| Repeat| Word3|
| 3| Aux|No repeat| Aux|
| 4| Test| Repeat| Test1|
| 4| Test| Repeat| Test2|
| 4| Test| Repeat| Test3|
+---+-----+---------+------+