Pyspark 数据框列值依赖于另一行的值
Pyspark dataframe column value dependent on value from another row
我有一个这样的数据框:
columns = ['manufacturer', 'product_id']
data = [("Factory", "AE222"), ("Sub-Factory-1", "0"), ("Sub-Factory-2", "0"),("Factory", "AE333"), ("Sub-Factory-1", "0"), ("Sub-Factory-2", "0")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
+-------------+----------+
| manufacturer|product_id|
+-------------+----------+
| Factory| AE222|
|Sub-Factory-1| 0|
|Sub-Factory-2| 0|
| Factory| AE333|
|Sub-Factory-1| 0|
|Sub-Factory-2| 0|
+-------------+----------+
我想把它变成这个:
+-------------+----------+
| manufacturer|product_id|
+-------------+----------+
| Factory| AE222|
|Sub-Factory-1| AE222|
|Sub-Factory-2| AE222|
| Factory| AE333|
|Sub-Factory-1| AE333|
|Sub-Factory-2| AE333|
+-------------+----------+
这样每个 Sub-Factory
都从当前 Sub-Factory
行上方最近的 Factory
值中获取值。我可以用嵌套的 for 循环来解决它,但它不是很有效,因为可能有数百万行。我研究了 Pyspark Window 函数,但无法真正理解它。有什么想法吗?
您可以在 Window 上使用 first
函数和 ignorenulls=True
。但是您需要识别 manufacturer
的组,以便按 group
.
进行分区
由于您没有提供任何 ID
列,我使用 monotonically_increasing_id
和累积条件总和来创建组列:
from pyspark.sql import functions as F
df1 = df.withColumn(
"row_id",
F.monotonically_increasing_id()
).withColumn(
"group",
F.sum(F.when(F.col("manufacturer") == "Factory", 1)).over(Window.orderBy("row_id"))
).withColumn(
"product_id",
F.when(
F.col("product_id") == 0,
F.first("product_id", ignorenulls=True).over(Window.partitionBy("group").orderBy("row_id"))
).otherwise(F.col("product_id"))
).drop("row_id", "group")
df1.show()
#+-------------+----------+
#| manufacturer|product_id|
#+-------------+----------+
#| Factory| AE222|
#|Sub-Factory-1| AE222|
#|Sub-Factory-2| AE222|
#| Factory| AE333|
#|Sub-Factory-1| AE333|
#|Sub-Factory-2| AE333|
#+-------------+----------+
我有一个这样的数据框:
columns = ['manufacturer', 'product_id']
data = [("Factory", "AE222"), ("Sub-Factory-1", "0"), ("Sub-Factory-2", "0"),("Factory", "AE333"), ("Sub-Factory-1", "0"), ("Sub-Factory-2", "0")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)
+-------------+----------+
| manufacturer|product_id|
+-------------+----------+
| Factory| AE222|
|Sub-Factory-1| 0|
|Sub-Factory-2| 0|
| Factory| AE333|
|Sub-Factory-1| 0|
|Sub-Factory-2| 0|
+-------------+----------+
我想把它变成这个:
+-------------+----------+
| manufacturer|product_id|
+-------------+----------+
| Factory| AE222|
|Sub-Factory-1| AE222|
|Sub-Factory-2| AE222|
| Factory| AE333|
|Sub-Factory-1| AE333|
|Sub-Factory-2| AE333|
+-------------+----------+
这样每个 Sub-Factory
都从当前 Sub-Factory
行上方最近的 Factory
值中获取值。我可以用嵌套的 for 循环来解决它,但它不是很有效,因为可能有数百万行。我研究了 Pyspark Window 函数,但无法真正理解它。有什么想法吗?
您可以在 Window 上使用 first
函数和 ignorenulls=True
。但是您需要识别 manufacturer
的组,以便按 group
.
由于您没有提供任何 ID
列,我使用 monotonically_increasing_id
和累积条件总和来创建组列:
from pyspark.sql import functions as F
df1 = df.withColumn(
"row_id",
F.monotonically_increasing_id()
).withColumn(
"group",
F.sum(F.when(F.col("manufacturer") == "Factory", 1)).over(Window.orderBy("row_id"))
).withColumn(
"product_id",
F.when(
F.col("product_id") == 0,
F.first("product_id", ignorenulls=True).over(Window.partitionBy("group").orderBy("row_id"))
).otherwise(F.col("product_id"))
).drop("row_id", "group")
df1.show()
#+-------------+----------+
#| manufacturer|product_id|
#+-------------+----------+
#| Factory| AE222|
#|Sub-Factory-1| AE222|
#|Sub-Factory-2| AE222|
#| Factory| AE333|
#|Sub-Factory-1| AE333|
#|Sub-Factory-2| AE333|
#+-------------+----------+