pyspark sql select from other table when is null with condition

pyspark sql select from other table when is null with condition

我需要构建一个 select 查询来用其他数据帧的内容填充数据帧中的空值 df1.
如果 in df1 为 null,则需要检查 df2 中带有 df2.date = df1.prev_date

的值

我现在的代码是

df1.createOrReplaceTempView("table1")
df2.createOrReplaceTempView("table2")
q="select NVL(b.value, select a.value from table2 a where a.date=b.prev_date_def) as value from table1 b"
df_out = spark.sql(q)

但是这段代码失败了,因为需要一个聚合函数。
错误:

pyspark.sql.utils.AnalysisException: Correlated scalar subqueries must be aggregated: Filter (cast(date#269 as date) = outer(prev_date#1500))...

df1

   +------------+-------+------------+
   |    date    |  value|  prev_date |
   +------------+-------+------------+
 1 | 23/11/2021 |  0.141| 24/11/2021 |
 2 | 24/11/2021 |   0.17| 23/11/2021 |
 3 | 25/11/2021 |   null| 24/11/2021 |
 4 | 26/11/2021 |  0.135| 25/11/2021 |

df2

   +------------+-------+
   |    date    |  value|
   +------------+-------+
 1 | 23/11/2021 |   1.00|
 2 | 24/11/2021 |   2.00|
 3 | 25/11/2021 |   3.00|
 4 | 26/11/2021 |   4.00|

预计df_out

    +------------+-------+------------+
    |    date    |  value|  prev_date |
    +------------+-------+------------+
  1 | 23/11/2021 |  0.141| 24/11/2021 |
  2 | 24/11/2021 |   0.17| 23/11/2021 |
  3 | 25/11/2021 |   2.00| 24/11/2021 |
  4 | 26/11/2021 |  0.135| 25/11/2021 |

左连接 + coalesce 应该做的工作:

result = df1.join(df2, df1["prev_date"] == df2["date"], "left").select(
    df1["date"],
    F.coalesce(df1["value"], df2["value"]).alias("value"),
    df1["prev_date"]
)

result.show()

#+----------+-----+----------+
#|      date|value| prev_date|
#+----------+-----+----------+
#|24/11/2021| 0.17|23/11/2021|
#|23/11/2021|0.141|24/11/2021|
#|25/11/2021|  2.0|24/11/2021|
#|26/11/2021|0.135|25/11/2021|
#+----------+-----+----------+

等效的 SparkSQL 查询:

result = spark.sql("""
SELECT t1.date,
       coalesce(t1.value, t2.value) as value,
       t1.prev_date
FROM   table1 t1
LEFT JOIN table2 t2
ON    t1.prev_date = t2.date
""")
from pyspark.sql.functions import *
w = Window().partitionBy().orderBy(col("date"))
new=(df1.join(df2.withColumnRenamed('value','value_2'), on=['date'], how='left')#Join the two df's renaming values in df2
        .withColumn("value", coalesce(col('value'),lag("value_2").over(w))).drop('value_2')#Fill the col value with a shifted value of value_2
    )

+-----------+-----+----------+
|       date|value| prev_date|
+-----------+-----+----------+
|223/11/2021|0.141|24/11/2021|
| 24/11/2021| 0.17|23/11/2021|
| 25/11/2021|  2.0|24/11/2021|
| 26/11/2021|0.135|25/11/2021|
+-----------+-----+----------+