sql - 如何连接小于另一个连接键的列
sql - how to join on a column that is less than another join key
我有两个 table,如下所示。我想做的是根据日期和 ID 加入 A 和 B,从 B 获取 value
。问题是,我想使用 add_month(A.Date, -1) = B.month
加入(在 table B 从一个月前)。如果那不可用,我想提前两个月加入 add_month(A.Date, -2) = B.month
我怎样才能在一个查询中实现这一点?结果所有 3 行应该是联合的。首选 Spark sql 而不是 api。非常感谢。
Table A:
--------------
ID. |Date |
---------------
A |2022-02 |
---------------
B |2022-02 |
---------------
C |2022-02 |
Table B:
----------------
ID. |Date |value|
-----------------
A |2022-01 | V1
-----------------
B |2022-01 | V2
---------------
C |2021-12 | V3
Expected output:
----------------
ID. |ADate |value|
-----------------
A |2022-02 | V1 --result from join condition add_month(A.Date, -1) = B.month
-----------------
B |2022-02. | V2
---------------
C |2022-02 | V3 ---result from join condition add_month(A.Date, -2) = B.month
我能想到的一种方法是,您可以为 A
列创建所需的 lag date
并与 date
和 B 连接,如下所示 -
数据准备
df1 = pd.DataFrame({
'id':['A','B','C'],
'Date':['2022-02'] * 3
})
sparkDF1 = sql.createDataFrame(df1)
sparkDF1 = sparkDF1.withColumn('date_lag_1',F.add_months(F.col('Date'),-1))\
.withColumn('date_lag_2',F.add_months(F.col('Date'),-2))
df2 = pd.DataFrame({
'id':['A','B','C'],
'Date':['2022-01','2022-01','2021-12'] ,
'Value':['V1','V2','V3']
})
sparkDF2 = sql.createDataFrame(df2)
sparkDF1.show()
+---+-------+----------+----------+
| id| Date|date_lag_1|date_lag_2|
+---+-------+----------+----------+
| A|2022-02|2022-01-01|2021-12-01|
| B|2022-02|2022-01-01|2021-12-01|
| C|2022-02|2022-01-01|2021-12-01|
+---+-------+----------+----------+
sparkDF2.show()
+---+-------+-----+
| id| Date|Value|
+---+-------+-----+
| A|2022-01| V1|
| B|2022-01| V2|
| C|2021-12| V3|
+---+-------+-----+
加入 - Spark API
finalDF = sparkDF1.join(sparkDF2
, ( sparkDF1['id'] == sparkDF2['id'] )
& ( (sparkDF1['date_lag_1'] == F.to_date(sparkDF2['date'],'yyyy-MM'))
| (sparkDF1['date_lag_2'] == F.to_date(sparkDF2['date'],'yyyy-MM'))
)
,'inner'
).select(sparkDF1['id']
,sparkDF1['Date']
,sparkDF2['Value']
).orderBy(F.col('id'))
finalDF.show()
+---+-------+-----+
| id| Date|Value|
+---+-------+-----+
| A|2022-02| V1|
| B|2022-02| V2|
| C|2022-02| V3|
+---+-------+-----+
加入-SparkSQL
sparkDF1.registerTempTable("TB1")
sparkDF2.registerTempTable("TB2")
sql.sql("""
SELECT
a.ID
,a.DATE
,b.VALUE
FROM TB1 a
INNER JOIN TB2 b
ON a.ID = b.ID
AND (ADD_MONTHS(a.DATE,-1) = B.DATE OR ADD_MONTHS(a.DATE,-2) = B.DATE)
ORDER BY a.ID
""").show()
+---+-------+-----+
| ID| DATE|VALUE|
+---+-------+-----+
| A|2022-02| V1|
| B|2022-02| V2|
| C|2022-02| V3|
+---+-------+-----+
我从来没有找到比将两个 table 中的每一个加入一个公共 table 表达式,在每个日期列上添加一个 LEAD() 表达式,以及最后在复杂条件下使用 equi 谓词对 id 和范围谓词对开始和结束日期进行连接:
WITH
-- your input , don't use in final query
a(id,dt) AS (
SELECT 'a', DATE '2022-02-01'
UNION ALL SELECT 'b', DATE '2022-02-01'
UNION ALL SELECT 'c', DATE '2022-02-01'
)
,
b(id,dt,val) AS (
SELECT 'a', DATE '2022-01-01','V1'
UNION ALL SELECT 'b', DATE '2022-01-01','V2'
UNION ALL SELECT 'c', DATE '2021-12-01','V3'
)
-- end of your input; real query starts here
-- replace following comma with "WITH"
,
a_w_next_date AS (
SELECT
*
, LEAD(dt,1,'9999-12-01') OVER(PARTITION BY id ORDER BY dt) AS next_dt
FROM a
)
,
b_w_next_date AS (
SELECT
*
, LEAD(dt,1,'9999-12-01') OVER(PARTITION BY id ORDER BY dt) AS next_dt
FROM b
)
SELECT
a.id
, a.dt
, b.val
FROM a_w_next_date a
JOIN b_w_next_date b
ON a.id = b.id
AND a.dt >= b.dt
AND a.dt < b.next_dt
AND b.dt < a.next_dt
;
-- out id | dt | val
-- out ----+------------+-----
-- out a | 2022-02-01 | V1
-- out b | 2022-02-01 | V2
-- out c | 2022-02-01 | V3
我有两个 table,如下所示。我想做的是根据日期和 ID 加入 A 和 B,从 B 获取 value
。问题是,我想使用 add_month(A.Date, -1) = B.month
加入(在 table B 从一个月前)。如果那不可用,我想提前两个月加入 add_month(A.Date, -2) = B.month
我怎样才能在一个查询中实现这一点?结果所有 3 行应该是联合的。首选 Spark sql 而不是 api。非常感谢。
Table A:
--------------
ID. |Date |
---------------
A |2022-02 |
---------------
B |2022-02 |
---------------
C |2022-02 |
Table B:
----------------
ID. |Date |value|
-----------------
A |2022-01 | V1
-----------------
B |2022-01 | V2
---------------
C |2021-12 | V3
Expected output:
----------------
ID. |ADate |value|
-----------------
A |2022-02 | V1 --result from join condition add_month(A.Date, -1) = B.month
-----------------
B |2022-02. | V2
---------------
C |2022-02 | V3 ---result from join condition add_month(A.Date, -2) = B.month
我能想到的一种方法是,您可以为 A
列创建所需的 lag date
并与 date
和 B 连接,如下所示 -
数据准备
df1 = pd.DataFrame({
'id':['A','B','C'],
'Date':['2022-02'] * 3
})
sparkDF1 = sql.createDataFrame(df1)
sparkDF1 = sparkDF1.withColumn('date_lag_1',F.add_months(F.col('Date'),-1))\
.withColumn('date_lag_2',F.add_months(F.col('Date'),-2))
df2 = pd.DataFrame({
'id':['A','B','C'],
'Date':['2022-01','2022-01','2021-12'] ,
'Value':['V1','V2','V3']
})
sparkDF2 = sql.createDataFrame(df2)
sparkDF1.show()
+---+-------+----------+----------+
| id| Date|date_lag_1|date_lag_2|
+---+-------+----------+----------+
| A|2022-02|2022-01-01|2021-12-01|
| B|2022-02|2022-01-01|2021-12-01|
| C|2022-02|2022-01-01|2021-12-01|
+---+-------+----------+----------+
sparkDF2.show()
+---+-------+-----+
| id| Date|Value|
+---+-------+-----+
| A|2022-01| V1|
| B|2022-01| V2|
| C|2021-12| V3|
+---+-------+-----+
加入 - Spark API
finalDF = sparkDF1.join(sparkDF2
, ( sparkDF1['id'] == sparkDF2['id'] )
& ( (sparkDF1['date_lag_1'] == F.to_date(sparkDF2['date'],'yyyy-MM'))
| (sparkDF1['date_lag_2'] == F.to_date(sparkDF2['date'],'yyyy-MM'))
)
,'inner'
).select(sparkDF1['id']
,sparkDF1['Date']
,sparkDF2['Value']
).orderBy(F.col('id'))
finalDF.show()
+---+-------+-----+
| id| Date|Value|
+---+-------+-----+
| A|2022-02| V1|
| B|2022-02| V2|
| C|2022-02| V3|
+---+-------+-----+
加入-SparkSQL
sparkDF1.registerTempTable("TB1")
sparkDF2.registerTempTable("TB2")
sql.sql("""
SELECT
a.ID
,a.DATE
,b.VALUE
FROM TB1 a
INNER JOIN TB2 b
ON a.ID = b.ID
AND (ADD_MONTHS(a.DATE,-1) = B.DATE OR ADD_MONTHS(a.DATE,-2) = B.DATE)
ORDER BY a.ID
""").show()
+---+-------+-----+
| ID| DATE|VALUE|
+---+-------+-----+
| A|2022-02| V1|
| B|2022-02| V2|
| C|2022-02| V3|
+---+-------+-----+
我从来没有找到比将两个 table 中的每一个加入一个公共 table 表达式,在每个日期列上添加一个 LEAD() 表达式,以及最后在复杂条件下使用 equi 谓词对 id 和范围谓词对开始和结束日期进行连接:
WITH
-- your input , don't use in final query
a(id,dt) AS (
SELECT 'a', DATE '2022-02-01'
UNION ALL SELECT 'b', DATE '2022-02-01'
UNION ALL SELECT 'c', DATE '2022-02-01'
)
,
b(id,dt,val) AS (
SELECT 'a', DATE '2022-01-01','V1'
UNION ALL SELECT 'b', DATE '2022-01-01','V2'
UNION ALL SELECT 'c', DATE '2021-12-01','V3'
)
-- end of your input; real query starts here
-- replace following comma with "WITH"
,
a_w_next_date AS (
SELECT
*
, LEAD(dt,1,'9999-12-01') OVER(PARTITION BY id ORDER BY dt) AS next_dt
FROM a
)
,
b_w_next_date AS (
SELECT
*
, LEAD(dt,1,'9999-12-01') OVER(PARTITION BY id ORDER BY dt) AS next_dt
FROM b
)
SELECT
a.id
, a.dt
, b.val
FROM a_w_next_date a
JOIN b_w_next_date b
ON a.id = b.id
AND a.dt >= b.dt
AND a.dt < b.next_dt
AND b.dt < a.next_dt
;
-- out id | dt | val
-- out ----+------------+-----
-- out a | 2022-02-01 | V1
-- out b | 2022-02-01 | V2
-- out c | 2022-02-01 | V3