pyspark sql select from other table when is null with condition
pyspark sql select from other table when is null with condition
我需要构建一个 select 查询来用其他数据帧的内容填充数据帧中的空值
df1.
如果 in df1
为 null,则需要检查 df2
中带有 df2.date = df1.prev_date
的值
我现在的代码是
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
q="select NVL(b.value, select a.value from table2 a where a.date=b.prev_date_def) as value from table1 b"
df_out = spark.sql(q)
但是这段代码失败了,因为需要一个聚合函数。
错误:
pyspark.sql.utils.AnalysisException: Correlated scalar subqueries must
be aggregated: Filter (cast(date#269 as date) =
outer(prev_date#1500))...
df1
+------------+-------+------------+
| date | value| prev_date |
+------------+-------+------------+
1 | 23/11/2021 | 0.141| 24/11/2021 |
2 | 24/11/2021 | 0.17| 23/11/2021 |
3 | 25/11/2021 | null| 24/11/2021 |
4 | 26/11/2021 | 0.135| 25/11/2021 |
df2
+------------+-------+
| date | value|
+------------+-------+
1 | 23/11/2021 | 1.00|
2 | 24/11/2021 | 2.00|
3 | 25/11/2021 | 3.00|
4 | 26/11/2021 | 4.00|
预计df_out
+------------+-------+------------+
| date | value| prev_date |
+------------+-------+------------+
1 | 23/11/2021 | 0.141| 24/11/2021 |
2 | 24/11/2021 | 0.17| 23/11/2021 |
3 | 25/11/2021 | 2.00| 24/11/2021 |
4 | 26/11/2021 | 0.135| 25/11/2021 |
左连接 + coalesce
应该做的工作:
result = df1.join(df2, df1["prev_date"] == df2["date"], "left").select(
df1["date"],
F.coalesce(df1["value"], df2["value"]).alias("value"),
df1["prev_date"]
)
result.show()
#+----------+-----+----------+
#| date|value| prev_date|
#+----------+-----+----------+
#|24/11/2021| 0.17|23/11/2021|
#|23/11/2021|0.141|24/11/2021|
#|25/11/2021| 2.0|24/11/2021|
#|26/11/2021|0.135|25/11/2021|
#+----------+-----+----------+
等效的 SparkSQL 查询:
result = spark.sql("""
SELECT t1.date,
coalesce(t1.value, t2.value) as value,
t1.prev_date
FROM table1 t1
LEFT JOIN table2 t2
ON t1.prev_date = t2.date
""")
from pyspark.sql.functions import *
w = Window().partitionBy().orderBy(col("date"))
new=(df1.join(df2.withColumnRenamed('value','value_2'), on=['date'], how='left')#Join the two df's renaming values in df2
.withColumn("value", coalesce(col('value'),lag("value_2").over(w))).drop('value_2')#Fill the col value with a shifted value of value_2
)
+-----------+-----+----------+
| date|value| prev_date|
+-----------+-----+----------+
|223/11/2021|0.141|24/11/2021|
| 24/11/2021| 0.17|23/11/2021|
| 25/11/2021| 2.0|24/11/2021|
| 26/11/2021|0.135|25/11/2021|
+-----------+-----+----------+
我需要构建一个 select 查询来用其他数据帧的内容填充数据帧中的空值
df1.
如果 in df1
为 null,则需要检查 df2
中带有 df2.date = df1.prev_date
我现在的代码是
df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
q="select NVL(b.value, select a.value from table2 a where a.date=b.prev_date_def) as value from table1 b"
df_out = spark.sql(q)
但是这段代码失败了,因为需要一个聚合函数。
错误:
pyspark.sql.utils.AnalysisException: Correlated scalar subqueries must be aggregated: Filter (cast(date#269 as date) = outer(prev_date#1500))...
df1
+------------+-------+------------+
| date | value| prev_date |
+------------+-------+------------+
1 | 23/11/2021 | 0.141| 24/11/2021 |
2 | 24/11/2021 | 0.17| 23/11/2021 |
3 | 25/11/2021 | null| 24/11/2021 |
4 | 26/11/2021 | 0.135| 25/11/2021 |
df2
+------------+-------+
| date | value|
+------------+-------+
1 | 23/11/2021 | 1.00|
2 | 24/11/2021 | 2.00|
3 | 25/11/2021 | 3.00|
4 | 26/11/2021 | 4.00|
预计df_out
+------------+-------+------------+
| date | value| prev_date |
+------------+-------+------------+
1 | 23/11/2021 | 0.141| 24/11/2021 |
2 | 24/11/2021 | 0.17| 23/11/2021 |
3 | 25/11/2021 | 2.00| 24/11/2021 |
4 | 26/11/2021 | 0.135| 25/11/2021 |
左连接 + coalesce
应该做的工作:
result = df1.join(df2, df1["prev_date"] == df2["date"], "left").select(
df1["date"],
F.coalesce(df1["value"], df2["value"]).alias("value"),
df1["prev_date"]
)
result.show()
#+----------+-----+----------+
#| date|value| prev_date|
#+----------+-----+----------+
#|24/11/2021| 0.17|23/11/2021|
#|23/11/2021|0.141|24/11/2021|
#|25/11/2021| 2.0|24/11/2021|
#|26/11/2021|0.135|25/11/2021|
#+----------+-----+----------+
等效的 SparkSQL 查询:
result = spark.sql("""
SELECT t1.date,
coalesce(t1.value, t2.value) as value,
t1.prev_date
FROM table1 t1
LEFT JOIN table2 t2
ON t1.prev_date = t2.date
""")
from pyspark.sql.functions import *
w = Window().partitionBy().orderBy(col("date"))
new=(df1.join(df2.withColumnRenamed('value','value_2'), on=['date'], how='left')#Join the two df's renaming values in df2
.withColumn("value", coalesce(col('value'),lag("value_2").over(w))).drop('value_2')#Fill the col value with a shifted value of value_2
)
+-----------+-----+----------+
| date|value| prev_date|
+-----------+-----+----------+
|223/11/2021|0.141|24/11/2021|
| 24/11/2021| 0.17|23/11/2021|
| 25/11/2021| 2.0|24/11/2021|
| 26/11/2021|0.135|25/11/2021|
+-----------+-----+----------+