使用 PySpark 从 Lat/Lon 列创建 LineString
Create LineString from Lat/Lon columns using PySpark
我有一个 PySpark 数据框,其中包含 Lat/Lon 个不同轨迹的点,由“trajectories_id”列标识。
trajectory_id
latitude
longitude
1
45
5
1
45
6
1
45
7
2
46
5
2
46
6
2
46
7
我想要做的是为每个 trajectory_id 提取一个 LineString 并将其存储在另一个数据框中,其中每一行代表一个带有“id”和“geometry”列的轨迹。在此示例中,输出应为:
trajectory_id
geometry
1
LINESTRING (5 45, 6 45, 7 45)
2
LINESTRING (5 46, 6 46, 7 46)
这与 this question 中的问题类似,但在我的情况下,我需要使用 PySpark。
我试过以下方法:
import pandas as pd
from shapely.geometry import Point,LineString
df = pd.DataFrame([[1, 45,5], [1, 45,6], [1, 45,7],[2, 46,5], [2, 46,6], [2, 46,7]], columns=['trajectory_id', 'latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
df2['points']=df2[["longitude", "latitude"]].apply(Point, axis=1)
geo_df.geometry.iloc[k]=str(LineString(df2['points']))
geo_df['trajectory_id'].iloc[k]=i
k=k+1
此代码有效,但在我的任务中,我正在处理更多轨迹(> 200 万),这需要很长时间,因为我在每次迭代中转换为 Pandas。
有没有一种方法可以更有效地获得相同的输出?
如前所述,我知道我应该避免使用 toPandas() (and/or collect() ),尤其是在 for 循环
中
您可以使用 pyspark SQL 的本机函数来执行此操作。
import pyspark.sql.functions as func
long_lat_df = df.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(" "), func.col("latitude")));
grouped_df = long_lat_df .groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))
final_df = grouped_df.withColumn('geometry', func.concat_ws(", ", func.col("geometry")));
我有一个 PySpark 数据框,其中包含 Lat/Lon 个不同轨迹的点,由“trajectories_id”列标识。
trajectory_id | latitude | longitude |
---|---|---|
1 | 45 | 5 |
1 | 45 | 6 |
1 | 45 | 7 |
2 | 46 | 5 |
2 | 46 | 6 |
2 | 46 | 7 |
我想要做的是为每个 trajectory_id 提取一个 LineString 并将其存储在另一个数据框中,其中每一行代表一个带有“id”和“geometry”列的轨迹。在此示例中,输出应为:
trajectory_id | geometry |
---|---|
1 | LINESTRING (5 45, 6 45, 7 45) |
2 | LINESTRING (5 46, 6 46, 7 46) |
这与 this question 中的问题类似,但在我的情况下,我需要使用 PySpark。
我试过以下方法:
import pandas as pd
from shapely.geometry import Point,LineString
df = pd.DataFrame([[1, 45,5], [1, 45,6], [1, 45,7],[2, 46,5], [2, 46,6], [2, 46,7]], columns=['trajectory_id', 'latitude','longitude'])
df1 = spark.createDataFrame(df)
idx_ = df1.select("trajectory_id").rdd.flatMap(lambda x: x).distinct().collect()
geo_df = pd.DataFrame(index=range(len(idx_)),columns=['geometry','trajectory_id'])
k=0
for i in idx_:
df2=df1.filter(F.col("trajectory_id").isin(i)).toPandas()
df2['points']=df2[["longitude", "latitude"]].apply(Point, axis=1)
geo_df.geometry.iloc[k]=str(LineString(df2['points']))
geo_df['trajectory_id'].iloc[k]=i
k=k+1
此代码有效,但在我的任务中,我正在处理更多轨迹(> 200 万),这需要很长时间,因为我在每次迭代中转换为 Pandas。 有没有一种方法可以更有效地获得相同的输出? 如前所述,我知道我应该避免使用 toPandas() (and/or collect() ),尤其是在 for 循环
中您可以使用 pyspark SQL 的本机函数来执行此操作。
import pyspark.sql.functions as func
long_lat_df = df.withColumn('joined_long_lat', func.concat(func.col("longitude"), func.lit(" "), func.col("latitude")));
grouped_df = long_lat_df .groupby('trajectory_id').agg(func.collect_list('joined_long_lat').alias("geometry"))
final_df = grouped_df.withColumn('geometry', func.concat_ws(", ", func.col("geometry")));