将两个数据框中的行与最近点连接起来

Joining rows from two dataframes with the closest point

嗨,我是 spark 的新手,我不确定如何处理这个问题。

我有 2 个表(为了便于解释而小得多):

答:Weather Data

B:travel data

我需要通过在同一天开始旅行时找到最近的车站来加入这些表格,并在旅行结束时执行相同的操作。所以最后我得到了旅行开始时和旅行结束时气象站的所有天气数据,每次旅行只有一行来自最近的气象站的数据。

我已经用 geopandas 和 udf 做了类似的事情,但它更容易,因为我正在寻找拦截。像这样:

def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)

state_gps = udf(find_state_gps, StringType())

这次我不知道如何处理逻辑。

我也尝试过执行此查询,但没有成功。

query = "SELECT STATION,\
    NAME,\
    LATITUDE,\
    LONGITUDE,\
    AWND,\
    p.id_trip,\
    p.Latitude,\
    p.Longitude,\
    p.startDate,\
      Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
      AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
        station_id,\
        Latitude,\
        Longitude,\
        startDate\
 FROM df1\
) AS p ON 1=1\
 ORDER BY dd"

并得到以下错误: 解析异常: 不匹配的输入 '2' 期望 {, ';'}(第 1 行,位置 189)

最后我想要这样的东西,没有重复的旅行。

id started_date finish_date finished weather_station_start weather_station_end more columns about weather for starting and ending trip locations
1 bim baz bim baz bim bim
2 bim baz bim baz bim bim

非常感谢你们的帮助。

我稍微更改了您的示例数据,因为所有站点的坐标都相同:

travel_data  = spark.createDataFrame(
  [
('0','2013-06-01','00:00:01','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('1','2013-06-01','00:00:08','-73.98915076','40.7423543','40.74317449','-74.00366443','2013-06-01')
,('2','2013-06-01','00:00:44','-73.99595065','40.69512845','40.69512845','-73.99595065','2013-06-01')
,('3','2013-06-01','00:01:04','-73.98758561','40.73524276','40.6917823','-73.9737299','2013-06-01')
,('4','2013-06-01','00:01:22','-74.01677685','40.70569254','40.68926942','-73.98912867','2013-06-01')
  ], ['id','startDate','startTime','Longitude','Latitude','end station latitude','end station longitude','stopdate']
)

weather_data  = spark.createDataFrame(
  [
 ('USINYWC0003','WHITE PLAINS 3.1 NNW 3, NY US','41.0639','-73.7722','71','2013-06-01','','','','','')
,('USINYWC0002','WHITE PLAINS 3.1 NNW 2, NY US','41.0638','-73.7723','71','2013-06-02','','','','','')
,('USINYWC0001','WHITE PLAINS 3.1 NNW 1, NY US','41.0635','-73.7724','71','2013-06-03','','','','','')
  ], ['STATION','NAME','LATITUDE','LONGITUDE','ELEVATION','DATE','AWND','AWND ATTRIBUTES','DAPR','DAPR ATTRIBUTES','DASE']
)

+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
| id| startDate|startTime|   Longitude|   Latitude|end station latitude|end station longitude|  stopdate|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+
|  0|2013-06-01| 00:00:01|-73.98915076| 40.7423543|         40.74317449|         -74.00366443|2013-06-01|
|  1|2013-06-01| 00:00:08|-73.98915076| 40.7423543|         40.74317449|         -74.00366443|2013-06-01|
|  2|2013-06-01| 00:00:44|-73.99595065|40.69512845|         40.69512845|         -73.99595065|2013-06-01|
|  3|2013-06-01| 00:01:04|-73.98758561|40.73524276|          40.6917823|          -73.9737299|2013-06-01|
|  4|2013-06-01| 00:01:22|-74.01677685|40.70569254|         40.68926942|         -73.98912867|2013-06-01|
+---+----------+---------+------------+-----------+--------------------+---------------------+----------+

+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|    STATION|                NAME|LATITUDE|LONGITUDE|ELEVATION|      DATE|AWND|AWND ATTRIBUTES|DAPR|DAPR ATTRIBUTES|DASE|
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+
|USINYWC0003|WHITE PLAINS 3.1 ...| 41.0639| -73.7722|       71|2013-06-01|    |               |    |               |    |
|USINYWC0002|WHITE PLAINS 3.1 ...| 41.0638| -73.7723|       71|2013-06-02|    |               |    |               |    |
|USINYWC0001|WHITE PLAINS 3.1 ...| 41.0635| -73.7724|       71|2013-06-03|    |               |    |               |    |
+-----------+--------------------+--------+---------+---------+----------+----+---------------+----+---------------+----+

然后,交叉连接两个数据帧,以计算start/end点与所有站点之间的正弦距离。不是使用交叉连接的最佳解决方案,但根据数据的大小,它可能是最简单的方法


from pyspark.sql.types import *
from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos, max, min
from pyspark.sql import Window as W

join_df = travel_data\
    .crossJoin(weather_data.select('NAME',col('LATITUDE').alias('st_LAT'),col('LONGITUDE').alias('st_LON'), 'AWND')) \
    .withColumn("dlon_start", radians(col("st_LON")) - radians(col("Longitude"))) \
    .withColumn("dlat_start", radians(col("st_LAT")) - radians(col("Latitude"))) \
    .withColumn("haversine_dist_start", asin(sqrt(
                                         sin(col("dlat_start") / 2) ** 2 + cos(radians(col("Latitude")))
                                         * cos(radians(col("st_LAT"))) * sin(col("dlon_start") / 2) ** 2
                                         )
                                    ) * 2 * 3963 * 5280)\
    .withColumn("dlon_end", radians(col("st_LON")) - radians(col("end station longitude"))) \
    .withColumn("dlat_end", radians(col("st_LAT")) - radians(col("end station latitude"))) \
    .withColumn("haversine_dist_end", asin(sqrt(
                                         sin(col("dlat_end") / 2) ** 2 + cos(radians(col("Latitude")))
                                         * cos(radians(col("st_LAT"))) * sin(col("dlon_end") / 2) ** 2
                                         )
                                    ) * 2 * 3963 * 5280)\
    .drop('dlon_start','dlat_start','dlon_end','dlat_end')

最后,使用window函数选择距离起点最近的车站(result1)和距离终点最近的车站(result2)

W = W.partitionBy("id")

result1 = join_df\
    .withColumn("min_dist_start", min('haversine_dist_start').over(W))\
    .filter(col("min_dist_start") == col('haversine_dist_start'))\
    .select('id',col('startDate').alias('started_date'),col('stopdate').alias('finish_date'),col('NAME').alias('weather_station_start'),col('Latitude').alias('Latitude_start'),col('Longitude').alias('Longitude_start'))



result2 = join_df\
    .withColumn("min_dist_end", min('haversine_dist_end').over(W))\
    .filter(col("min_dist_end") == col('haversine_dist_end'))\
    .select('id', col('NAME').alias('weather_station_end'))

final = result1.join(result2, 'id', 'left')

final.show()

不确定您想要在输出中包含哪些列,但希望这能给您一些见解 输出:

+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|id |started_date|finish_date|weather_station_start        |Latitude_start|Longitude_start|weather_station_end          |
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+
|0  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543    |-73.98915076   |WHITE PLAINS 3.1 NNW 1, NY US|
|1  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.7423543    |-73.98915076   |WHITE PLAINS 3.1 NNW 1, NY US|
|2  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.69512845   |-73.99595065   |WHITE PLAINS 3.1 NNW 1, NY US|
|3  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.73524276   |-73.98758561   |WHITE PLAINS 3.1 NNW 1, NY US|
|4  |2013-06-01  |2013-06-01 |WHITE PLAINS 3.1 NNW 1, NY US|40.70569254   |-74.01677685   |WHITE PLAINS 3.1 NNW 1, NY US|
+---+------------+-----------+-----------------------------+--------------+---------------+-----------------------------+