破折号 - map_partition
Dask - map_partition
我有一个带有纬度和经度集的 Dask DataFrame(~32m 记录)。我正在尝试使用如下函数计算 lat/lon 之间的距离:
import numpy as np
from geopy import distance
def calc_distance(df, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if df[lat_col_name_1] != np.nan and df[lon_col_name_1] != np.nan and df[lat_col_name_2] != np.nan and df[lon_col_name_2] != np.nan:
return distance.distance((df[lat_col_name_1], df[lon_col_name_1]), (df[lat_col_name_2], df[lon_col_name_2])).miles
else:
return np.nan
我尝试使用 map_partitions 调用此函数(创建索引和距离的 DataFrame 以及使用 assign 调用 map_paritions。我想使用 assign 这样我就可以避免加入DataFrames 重新组合在一起(看起来很昂贵)。它不喜欢 np.nan 检查。我得到
ValueError: The truth value of a Series is ambiguous. Use a.empty,
a.bool(), a.item(), a.any() or a.all().
我有空 lat/lon 的记录,所以我需要在计算距离时考虑到这一点。
使用map_partitions
distance = big_df.map_partitions(calc_distance,
lat_col_name_1='latitude_1',
lon_col_name_1='longitude_1',
lat_col_name_2='latitude_2',
lon_col_name_2='longitude_2',
meta={'distance': np.float64})
使用map_partitions并赋值
def calc_distance_miles(lat1, lon1, lat2, lon2):
if lat1 != np.nan and lon1 != np.nan and lat2 != np.nan and lon2 != np.nan:
return distance.distance((lat1, lon1), (lat2, lon2)).miles
else:
return np.nan
big_df = big_df.map_partitions(lambda df: df.assign(
distance=calc_distance_miles(df['latitude_1'], df['longitude_1'], df['latitude_2'], df['longitude_2'])
), meta={'distance': np.float64}
)
map_partitions
不同于 df.apply - 函数 calc_distance
正在使用 dask.dataframe 的分区调用,其类型为 pd.DataFrame。
因此,df[lat_col_name_1]
是一个系列,df[lat_col_name_1] != np.nan
是一个布尔系列(它总是 return 这个错误 - 参见 )。
有比按元素计算距离更快的数组方法,但是 dask.dataframe 类似于您要尝试做的事情是使用 map_partitions 然后应用:
def calc_distance(series, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if series[
[lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2]
].notnull().all():
return distance.distance(
(series[lat_col_name_1], series[lon_col_name_1]),
(series[lat_col_name_2], series[lon_col_name_2]),
).miles
else:
return np.nan
def calc_distance_df(df, **kwargs):
return df.apply(calc_distance, axis=1, **kwargs)
distances = big_df.map_partitions(
calc_distance_df,
meta=np.float64,
lat_col_name_1=lat_col_name_1,
lon_col_name_1=lon_col_name_1,
lat_col_name_2=lat_col_name_2,
lon_col_name_2=lon_col_name_2,
)
我有一个带有纬度和经度集的 Dask DataFrame(~32m 记录)。我正在尝试使用如下函数计算 lat/lon 之间的距离:
import numpy as np
from geopy import distance
def calc_distance(df, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if df[lat_col_name_1] != np.nan and df[lon_col_name_1] != np.nan and df[lat_col_name_2] != np.nan and df[lon_col_name_2] != np.nan:
return distance.distance((df[lat_col_name_1], df[lon_col_name_1]), (df[lat_col_name_2], df[lon_col_name_2])).miles
else:
return np.nan
我尝试使用 map_partitions 调用此函数(创建索引和距离的 DataFrame 以及使用 assign 调用 map_paritions。我想使用 assign 这样我就可以避免加入DataFrames 重新组合在一起(看起来很昂贵)。它不喜欢 np.nan 检查。我得到
ValueError: The truth value of a Series is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
我有空 lat/lon 的记录,所以我需要在计算距离时考虑到这一点。
使用map_partitions
distance = big_df.map_partitions(calc_distance,
lat_col_name_1='latitude_1',
lon_col_name_1='longitude_1',
lat_col_name_2='latitude_2',
lon_col_name_2='longitude_2',
meta={'distance': np.float64})
使用map_partitions并赋值
def calc_distance_miles(lat1, lon1, lat2, lon2):
if lat1 != np.nan and lon1 != np.nan and lat2 != np.nan and lon2 != np.nan:
return distance.distance((lat1, lon1), (lat2, lon2)).miles
else:
return np.nan
big_df = big_df.map_partitions(lambda df: df.assign(
distance=calc_distance_miles(df['latitude_1'], df['longitude_1'], df['latitude_2'], df['longitude_2'])
), meta={'distance': np.float64}
)
map_partitions
不同于 df.apply - 函数 calc_distance
正在使用 dask.dataframe 的分区调用,其类型为 pd.DataFrame。
因此,df[lat_col_name_1]
是一个系列,df[lat_col_name_1] != np.nan
是一个布尔系列(它总是 return 这个错误 - 参见
有比按元素计算距离更快的数组方法,但是 dask.dataframe 类似于您要尝试做的事情是使用 map_partitions 然后应用:
def calc_distance(series, lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2):
if series[
[lat_col_name_1, lon_col_name_1, lat_col_name_2, lon_col_name_2]
].notnull().all():
return distance.distance(
(series[lat_col_name_1], series[lon_col_name_1]),
(series[lat_col_name_2], series[lon_col_name_2]),
).miles
else:
return np.nan
def calc_distance_df(df, **kwargs):
return df.apply(calc_distance, axis=1, **kwargs)
distances = big_df.map_partitions(
calc_distance_df,
meta=np.float64,
lat_col_name_1=lat_col_name_1,
lon_col_name_1=lon_col_name_1,
lat_col_name_2=lat_col_name_2,
lon_col_name_2=lon_col_name_2,
)