Select 行来自基于条件的 Spark DataFrame
Select rows from Spark DataFrame based on a condition
我有两个 Spark 数据帧:
df1
+---+----+
| id| var|
+---+----+
|323| [a]|
+---+----+
df2
+----+----------+----------+
| src| str_value| num_value|
+----+----------+----------+
| [a]| ghn12| 0.0 |
+----+----------+----------+
| [a]| 54fdg| 1.2 |
+----+----------+----------+
| [a]| 90okl| 0.7 |
+----+----------+----------+
| [b]| jh456| 0.5 |
+----+----------+----------+
| [a]| ghn12| 0.2 |
+----+----------+----------+
| [c]| ghn12| 0.7 |
+----+----------+----------+
我需要 return 来自 df2
数据帧的前 3 行,其中 df1.var == df2.src
和 df2.num_value
具有最小值。因此,所需的输出是(按 num_value
排序):
+----+----------+----------+
| src| str_value| num_value|
+----+----------+----------+
| [a]| ghn12| 0.0 |
+----+----------+----------+
| [a]| ghn12| 0.2 |
+----+----------+----------+
| [a]| 90okl| 0.7 |
+----+----------+----------+
我知道如何使用 SQL 来实现它,但是我在使用 PySpark/Spark SQL 时遇到了一些困难。
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
windowSpec = Window.partitionBy("src").orderBy("num_value")
df_joined = df1.join(df2,df1.var==df2.src).drop("var", "id")
df_joined.withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number")<4).drop("row_number").show()
# +---+---------+---------+
# |src|str_value|num_value|
# +---+---------+---------+
# |[a]| ghn12| 0.0|
# |[a]| ghn12| 0.2|
# |[a]| 90okl| 0.7|
# +---+---------+---------+
我会使用 dense_rank
window 函数。
from pyspark.sql import functions as F, Window as W
w = W.partitionBy('src').orderBy('num_value')
df3 = (
df2
.join(df1, df2.src == df1.var, 'semi')
.withColumn('_rank', F.dense_rank().over(w))
.filter('_rank <= 3')
.drop('_rank')
)
我有两个 Spark 数据帧:
df1
+---+----+
| id| var|
+---+----+
|323| [a]|
+---+----+
df2
+----+----------+----------+
| src| str_value| num_value|
+----+----------+----------+
| [a]| ghn12| 0.0 |
+----+----------+----------+
| [a]| 54fdg| 1.2 |
+----+----------+----------+
| [a]| 90okl| 0.7 |
+----+----------+----------+
| [b]| jh456| 0.5 |
+----+----------+----------+
| [a]| ghn12| 0.2 |
+----+----------+----------+
| [c]| ghn12| 0.7 |
+----+----------+----------+
我需要 return 来自 df2
数据帧的前 3 行,其中 df1.var == df2.src
和 df2.num_value
具有最小值。因此,所需的输出是(按 num_value
排序):
+----+----------+----------+
| src| str_value| num_value|
+----+----------+----------+
| [a]| ghn12| 0.0 |
+----+----------+----------+
| [a]| ghn12| 0.2 |
+----+----------+----------+
| [a]| 90okl| 0.7 |
+----+----------+----------+
我知道如何使用 SQL 来实现它,但是我在使用 PySpark/Spark SQL 时遇到了一些困难。
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
windowSpec = Window.partitionBy("src").orderBy("num_value")
df_joined = df1.join(df2,df1.var==df2.src).drop("var", "id")
df_joined.withColumn("row_number",row_number().over(windowSpec)).filter(col("row_number")<4).drop("row_number").show()
# +---+---------+---------+
# |src|str_value|num_value|
# +---+---------+---------+
# |[a]| ghn12| 0.0|
# |[a]| ghn12| 0.2|
# |[a]| 90okl| 0.7|
# +---+---------+---------+
我会使用 dense_rank
window 函数。
from pyspark.sql import functions as F, Window as W
w = W.partitionBy('src').orderBy('num_value')
df3 = (
df2
.join(df1, df2.src == df1.var, 'semi')
.withColumn('_rank', F.dense_rank().over(w))
.filter('_rank <= 3')
.drop('_rank')
)