破折号 - map_partition

Dask - map_partition

我有一个带有纬度和经度集的 Dask DataFrame(~32m 记录)。我正在尝试使用如下函数计算 lat/lon 之间的距离:

import numpy as np
from geopy import distance

def calc_distance(df, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if df[lat_col_name_1] != np.nan and df[lon_col_name_1] != np.nan and df[lat_col_name_2] != np.nan and df[lon_col_name_2] != np.nan:
    return distance.distance((df[lat_col_name_1], df[lon_col_name_1]), (df[lat_col_name_2], df[lon_col_name_2])).miles
else:
    return np.nan 

我尝试使用 map_partitions 调用此函数(创建索引和距离的 DataFrame 以及使用 assign 调用 map_paritions。我想使用 assign 这样我就可以避免加入DataFrames 重新组合在一起(看起来很昂贵)。它不喜欢 np.nan 检查。我得到

ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

我有空 lat/lon 的记录,所以我需要在计算距离时考虑到这一点。

使用map_partitions

distance = big_df.map_partitions(calc_distance, 
                                    lat_col_name_1='latitude_1', 
                                    lon_col_name_1='longitude_1', 
                                    lat_col_name_2='latitude_2', 
                                    lon_col_name_2='longitude_2', 
                                    meta={'distance': np.float64})

使用map_partitions并赋值

def calc_distance_miles(lat1, lon1, lat2, lon2):
    if lat1 != np.nan and lon1 != np.nan and lat2 != np.nan and lon2 != np.nan:
        return distance.distance((lat1, lon1), (lat2, lon2)).miles
    else:
        return np.nan
    

big_df = big_df.map_partitions(lambda df: df.assign(
    distance=calc_distance_miles(df['latitude_1'], df['longitude_1'], df['latitude_2'], df['longitude_2'])
), meta={'distance': np.float64}
)

map_partitions 不同于 df.apply - 函数 calc_distance 正在使用 dask.dataframe 的分区调用,其类型为 pd.DataFrame。

因此,df[lat_col_name_1] 是一个系列,df[lat_col_name_1] != np.nan 是一个布尔系列(它总是 return 这个错误 - 参见 )。

有比按元素计算距离更快的数组方法,但是 dask.dataframe 类似于您要尝试做的事情是使用 map_partitions 然后应用:

def calc_distance(series, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
    if series[
        [lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2]
    ].notnull().all():

        return distance.distance(
            (series[lat_col_name_1], series[lon_col_name_1]),
            (series[lat_col_name_2], series[lon_col_name_2]),
        ).miles

    else:
        return np.nan 

def calc_distance_df(df, **kwargs):
    return df.apply(calc_distance, axis=1, **kwargs)

distances = big_df.map_partitions(
    calc_distance_df,
    meta=np.float64,
    lat_col_name_1=lat_col_name_1,
    lon_col_name_1=lon_col_name_1,
    lat_col_name_2=lat_col_name_2,
    lon_col_name_2=lon_col_name_2,
)