将两个数据框中的行与最近点连接起来
Joining rows from two dataframes with the closest point
嗨,我是 spark 的新手,我不确定如何处理这个问题。
我有 2 个表(为了便于解释而小得多):
答:Weather Data
B:travel data
我需要通过在同一天开始旅行时找到最近的车站来加入这些表格,并在旅行结束时执行相同的操作。所以最后我得到了旅行开始时和旅行结束时气象站的所有天气数据,每次旅行只有一行来自最近的气象站的数据。
我已经用 geopandas 和 udf 做了类似的事情,但它更容易,因为我正在寻找拦截。像这样:
def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)
state_gps = udf(find_state_gps, StringType())
这次我不知道如何处理逻辑。
我也尝试过执行此查询,但没有成功。
query = "SELECT STATION,\
NAME,\
LATITUDE,\
LONGITUDE,\
AWND,\
p.id_trip,\
p.Latitude,\
p.Longitude,\
p.startDate,\
Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
station_id,\
Latitude,\
Longitude,\
startDate\
FROM df1\
) AS p ON 1=1\
ORDER BY dd"
并得到以下错误:
解析异常:
不匹配的输入 '2' 期望 {, ';'}(第 1 行,位置 189)
最后我想要这样的东西,没有重复的旅行。
id
started_date
finish_date
finished
weather_station_start
weather_station_end
more columns about weather for starting and ending trip locations
1
bim
baz
bim
baz
bim
bim
2
bim
baz
bim
baz
bim
bim
非常感谢你们的帮助。
我稍微更改了您的示例数据,因为所有站点的坐标都相同:
travel_data = spark.createDataFrame(
[
('0','2013-06-01','00:00:01','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('1','2013-06-01','00:00:08','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('2','2013-06-01','00:00:44','-73.99595065','40.69512845','40.69512845','-73.99595065','2013-06-01')
,('3','2013-06-01','00:01:04','-73.98758561','40.73524276','40.6917823','-73.9737299','2013-06-01')
,('4','2013-06-01','00:01:22','-74.01677685','40.70569254','40.68926942','-73.98912867','2013-06-01')
], ['id','startDate','startTime','Longitude','Latitude','end station latitude','end station longitude','stopdate']
)
weather_data = spark.createDataFrame(
[
('USINYWC0003','WHITE PLAINS 3.1 NNW 3, NY US','41.0639','-73.7722','71','2013-06-01','','','','','')
,('USINYWC0002','WHITE PLAINS 3.1 NNW 2, NY US','41.0638','-73.7723','71','2013-06-02','','','','','')
,('USINYWC0001','WHITE PLAINS 3.1 NNW 1, NY US','41.0635','-73.7724','71','2013-06-03','','','','','')
], ['STATION','NAME','LATITUDE','LONGITUDE','ELEVATION','DATE','AWND','AWND ATTRIBUTES','DAPR','DAPR ATTRIBUTES','DASE']
)
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| id| startDate|startTime| Longitude| Latitude|end station latitude|end station longitude| stopdate|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| 0|2013-06-01| 00:00:01|-73.98915076| 40.7423543| 40.74317449| -74.00366443|2013-06-01|
| 1|2013-06-01| 00:00:08|-73.98915076| 40.7423543| 40.74317449| -74.00366443|2013-06-01|
| 2|2013-06-01| 00:00:44|-73.99595065|40.69512845| 40.69512845| -73.99595065|2013-06-01|
| 3|2013-06-01| 00:01:04|-73.98758561|40.73524276| 40.6917823| -73.9737299|2013-06-01|
| 4|2013-06-01| 00:01:22|-74.01677685|40.70569254| 40.68926942| -73.98912867|2013-06-01|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
| STATION| NAME|LATITUDE|LONGITUDE|ELEVATION| DATE|AWND|AWND ATTRIBUTES|DAPR|DAPR ATTRIBUTES|DASE|
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|USINYWC0003|WHITE PLAINS 3.1 ...| 41.0639| -73.7722| 71|2013-06-01| | | | | |
|USINYWC0002|WHITE PLAINS 3.1 ...| 41.0638| -73.7723| 71|2013-06-02| | | | | |
|USINYWC0001|WHITE PLAINS 3.1 ...| 41.0635| -73.7724| 71|2013-06-03| | | | | |
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
然后,交叉连接两个数据帧,以计算start/end点与所有站点之间的正弦距离。不是使用交叉连接的最佳解决方案,但根据数据的大小,它可能是最简单的方法
from pyspark.sql.types import *
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos, max, min
from pyspark.sql import Window as W
join_df = travel_data\
.crossJoin(weather_data.select('NAME',col('LATITUDE').alias('st_LAT'),col('LONGITUDE').alias('st_LON'), 'AWND')) \
.withColumn("dlon_start", radians(col("st_LON")) - radians(col("Longitude"))) \
.withColumn("dlat_start", radians(col("st_LAT")) - radians(col("Latitude"))) \
.withColumn("haversine_dist_start", asin(sqrt(
sin(col("dlat_start") / 2) ** 2 + cos(radians(col("Latitude")))
* cos(radians(col("st_LAT"))) * sin(col("dlon_start") / 2) ** 2
)
) * 2 * 3963 * 5280)\
.withColumn("dlon_end", radians(col("st_LON")) - radians(col("end station longitude"))) \
.withColumn("dlat_end", radians(col("st_LAT")) - radians(col("end station latitude"))) \
.withColumn("haversine_dist_end", asin(sqrt(
sin(col("dlat_end") / 2) ** 2 + cos(radians(col("Latitude")))
* cos(radians(col("st_LAT"))) * sin(col("dlon_end") / 2) ** 2
)
) * 2 * 3963 * 5280)\
.drop('dlon_start','dlat_start','dlon_end','dlat_end')
最后,使用window函数选择距离起点最近的车站(result1)和距离终点最近的车站(result2)
W = W.partitionBy("id")
result1 = join_df\
.withColumn("min_dist_start", min('haversine_dist_start').over(W))\
.filter(col("min_dist_start") == col('haversine_dist_start'))\
.select('id',col('startDate').alias('started_date'),col('stopdate').alias('finish_date'),col('NAME').alias('weather_station_start'),col('Latitude').alias('Latitude_start'),col('Longitude').alias('Longitude_start'))
result2 = join_df\
.withColumn("min_dist_end", min('haversine_dist_end').over(W))\
.filter(col("min_dist_end") == col('haversine_dist_end'))\
.select('id', col('NAME').alias('weather_station_end'))
final = result1.join(result2, 'id', 'left')
final.show()
不确定您想要在输出中包含哪些列,但希望这能给您一些见解
输出:
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|id |started_date|finish_date|weather_station_start |Latitude_start|Longitude_start|weather_station_end |
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|0 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543 |-73.98915076 |WHITE PLAINS 3.1 NNW 1, NY US|
|1 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543 |-73.98915076 |WHITE PLAINS 3.1 NNW 1, NY US|
|2 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.69512845 |-73.99595065 |WHITE PLAINS 3.1 NNW 1, NY US|
|3 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.73524276 |-73.98758561 |WHITE PLAINS 3.1 NNW 1, NY US|
|4 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.70569254 |-74.01677685 |WHITE PLAINS 3.1 NNW 1, NY US|
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
嗨,我是 spark 的新手,我不确定如何处理这个问题。
我有 2 个表(为了便于解释而小得多):
答:Weather Data
B:travel data
我需要通过在同一天开始旅行时找到最近的车站来加入这些表格,并在旅行结束时执行相同的操作。所以最后我得到了旅行开始时和旅行结束时气象站的所有天气数据,每次旅行只有一行来自最近的气象站的数据。
我已经用 geopandas 和 udf 做了类似的事情,但它更容易,因为我正在寻找拦截。像这样:
def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)
state_gps = udf(find_state_gps, StringType())
这次我不知道如何处理逻辑。
我也尝试过执行此查询,但没有成功。
query = "SELECT STATION,\
NAME,\
LATITUDE,\
LONGITUDE,\
AWND,\
p.id_trip,\
p.Latitude,\
p.Longitude,\
p.startDate,\
Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
station_id,\
Latitude,\
Longitude,\
startDate\
FROM df1\
) AS p ON 1=1\
ORDER BY dd"
并得到以下错误: 解析异常: 不匹配的输入 '2' 期望 {, ';'}(第 1 行,位置 189)
最后我想要这样的东西,没有重复的旅行。
id | started_date | finish_date | finished | weather_station_start | weather_station_end | more columns about weather for starting and ending trip locations |
---|---|---|---|---|---|---|
1 | bim | baz | bim | baz | bim | bim |
2 | bim | baz | bim | baz | bim | bim |
非常感谢你们的帮助。
我稍微更改了您的示例数据,因为所有站点的坐标都相同:
travel_data = spark.createDataFrame(
[
('0','2013-06-01','00:00:01','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('1','2013-06-01','00:00:08','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('2','2013-06-01','00:00:44','-73.99595065','40.69512845','40.69512845','-73.99595065','2013-06-01')
,('3','2013-06-01','00:01:04','-73.98758561','40.73524276','40.6917823','-73.9737299','2013-06-01')
,('4','2013-06-01','00:01:22','-74.01677685','40.70569254','40.68926942','-73.98912867','2013-06-01')
], ['id','startDate','startTime','Longitude','Latitude','end station latitude','end station longitude','stopdate']
)
weather_data = spark.createDataFrame(
[
('USINYWC0003','WHITE PLAINS 3.1 NNW 3, NY US','41.0639','-73.7722','71','2013-06-01','','','','','')
,('USINYWC0002','WHITE PLAINS 3.1 NNW 2, NY US','41.0638','-73.7723','71','2013-06-02','','','','','')
,('USINYWC0001','WHITE PLAINS 3.1 NNW 1, NY US','41.0635','-73.7724','71','2013-06-03','','','','','')
], ['STATION','NAME','LATITUDE','LONGITUDE','ELEVATION','DATE','AWND','AWND ATTRIBUTES','DAPR','DAPR ATTRIBUTES','DASE']
)
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| id| startDate|startTime| Longitude| Latitude|end station latitude|end station longitude| stopdate|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| 0|2013-06-01| 00:00:01|-73.98915076| 40.7423543| 40.74317449| -74.00366443|2013-06-01|
| 1|2013-06-01| 00:00:08|-73.98915076| 40.7423543| 40.74317449| -74.00366443|2013-06-01|
| 2|2013-06-01| 00:00:44|-73.99595065|40.69512845| 40.69512845| -73.99595065|2013-06-01|
| 3|2013-06-01| 00:01:04|-73.98758561|40.73524276| 40.6917823| -73.9737299|2013-06-01|
| 4|2013-06-01| 00:01:22|-74.01677685|40.70569254| 40.68926942| -73.98912867|2013-06-01|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
| STATION| NAME|LATITUDE|LONGITUDE|ELEVATION| DATE|AWND|AWND ATTRIBUTES|DAPR|DAPR ATTRIBUTES|DASE|
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|USINYWC0003|WHITE PLAINS 3.1 ...| 41.0639| -73.7722| 71|2013-06-01| | | | | |
|USINYWC0002|WHITE PLAINS 3.1 ...| 41.0638| -73.7723| 71|2013-06-02| | | | | |
|USINYWC0001|WHITE PLAINS 3.1 ...| 41.0635| -73.7724| 71|2013-06-03| | | | | |
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
然后,交叉连接两个数据帧,以计算start/end点与所有站点之间的正弦距离。不是使用交叉连接的最佳解决方案,但根据数据的大小,它可能是最简单的方法
from pyspark.sql.types import *
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos, max, min
from pyspark.sql import Window as W
join_df = travel_data\
.crossJoin(weather_data.select('NAME',col('LATITUDE').alias('st_LAT'),col('LONGITUDE').alias('st_LON'), 'AWND')) \
.withColumn("dlon_start", radians(col("st_LON")) - radians(col("Longitude"))) \
.withColumn("dlat_start", radians(col("st_LAT")) - radians(col("Latitude"))) \
.withColumn("haversine_dist_start", asin(sqrt(
sin(col("dlat_start") / 2) ** 2 + cos(radians(col("Latitude")))
* cos(radians(col("st_LAT"))) * sin(col("dlon_start") / 2) ** 2
)
) * 2 * 3963 * 5280)\
.withColumn("dlon_end", radians(col("st_LON")) - radians(col("end station longitude"))) \
.withColumn("dlat_end", radians(col("st_LAT")) - radians(col("end station latitude"))) \
.withColumn("haversine_dist_end", asin(sqrt(
sin(col("dlat_end") / 2) ** 2 + cos(radians(col("Latitude")))
* cos(radians(col("st_LAT"))) * sin(col("dlon_end") / 2) ** 2
)
) * 2 * 3963 * 5280)\
.drop('dlon_start','dlat_start','dlon_end','dlat_end')
最后,使用window函数选择距离起点最近的车站(result1)和距离终点最近的车站(result2)
W = W.partitionBy("id")
result1 = join_df\
.withColumn("min_dist_start", min('haversine_dist_start').over(W))\
.filter(col("min_dist_start") == col('haversine_dist_start'))\
.select('id',col('startDate').alias('started_date'),col('stopdate').alias('finish_date'),col('NAME').alias('weather_station_start'),col('Latitude').alias('Latitude_start'),col('Longitude').alias('Longitude_start'))
result2 = join_df\
.withColumn("min_dist_end", min('haversine_dist_end').over(W))\
.filter(col("min_dist_end") == col('haversine_dist_end'))\
.select('id', col('NAME').alias('weather_station_end'))
final = result1.join(result2, 'id', 'left')
final.show()
不确定您想要在输出中包含哪些列,但希望这能给您一些见解 输出:
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|id |started_date|finish_date|weather_station_start |Latitude_start|Longitude_start|weather_station_end |
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|0 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543 |-73.98915076 |WHITE PLAINS 3.1 NNW 1, NY US|
|1 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543 |-73.98915076 |WHITE PLAINS 3.1 NNW 1, NY US|
|2 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.69512845 |-73.99595065 |WHITE PLAINS 3.1 NNW 1, NY US|
|3 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.73524276 |-73.98758561 |WHITE PLAINS 3.1 NNW 1, NY US|
|4 |2013-06-01 |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.70569254 |-74.01677685 |WHITE PLAINS 3.1 NNW 1, NY US|
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+