当需要计算dataframe列时使用dask进行并行计算
Parallel computation with dask when dataframe column needs to be computed
我有一个 3.6 亿记录的观鸟数据框
我想以分布式方式使用 dask
计算每种鸟类的质心作为年的函数。
我想做:
df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()
但我需要先计算 yearday
,我不知道是否有办法使用 dask
即时执行此操作。我希望 dask 可以将新数据持久化给 dask
工作人员,但是当我尝试时:
def yearday(r):
r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
return r
df.apply(yearday, axis=1).persist()
它不缩放。
如果有人想实际尝试,可以这样加载数据:
import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
storage_options={'anon': True, 'use_ssl': False})
注意:尽管我将此数据集称为 EOD_CLO_2016.parq.gz
,但它在 S3 存储桶中的许多对象上分块以促进并行化。每个块都是 gzip 压缩的。
有没有办法以分布式方式即时进行此计算,或者在使用 groupby
执行可扩展部分之前,我是否需要编写另一个包含年日列的数据文件?
根据您对 notebook 所做的操作,我将按以下方式修改 groupby
之前的步骤
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
'DECIMALLONGITUDE', 'VERNACULARNAME'],
storage_options={'anon': True, 'use_ssl': False})
df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
lat=np.deg2rad(df['DECIMALLATITUDE'].values),
lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8'})
df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
z=np.sin(df['lat'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8',
'x':'f8', 'y':'f8', 'z':'f8'})
更新: 我不确定将数据存储为单个压缩文件而不是多个文件是否是个好主意。您考虑过不同的选择吗?
更新 2: 鉴于从度数到弧度的转换是线性的,您可以计算 lon, lat
然后 x,y,z
在 groupby
之后.
我有一个 3.6 亿记录的观鸟数据框
dask
计算每种鸟类的质心作为年的函数。
我想做:
df2 = df.groupby(['VERNACULARNAME', 'yearday']).mean()
但我需要先计算 yearday
,我不知道是否有办法使用 dask
即时执行此操作。我希望 dask 可以将新数据持久化给 dask
工作人员,但是当我尝试时:
def yearday(r):
r['yearday'] = dt.datetime(r['YEAR'], r['MONTH'], r['DAY']).timetuple().tm_yday
return r
df.apply(yearday, axis=1).persist()
它不缩放。
如果有人想实际尝试,可以这样加载数据:
import dask.dataframe as dd
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
storage_options={'anon': True, 'use_ssl': False})
注意:尽管我将此数据集称为 EOD_CLO_2016.parq.gz
,但它在 S3 存储桶中的许多对象上分块以促进并行化。每个块都是 gzip 压缩的。
有没有办法以分布式方式即时进行此计算,或者在使用 groupby
执行可扩展部分之前,我是否需要编写另一个包含年日列的数据文件?
根据您对 notebook 所做的操作,我将按以下方式修改 groupby
之前的步骤
df = dd.read_parquet('s3://esipfed/ebird/EOD_CLO_2016.parq.gz',
columns=['YEAR', 'MONTH', 'DAY', 'DECIMALLATITUDE',
'DECIMALLONGITUDE', 'VERNACULARNAME'],
storage_options={'anon': True, 'use_ssl': False})
df = df.map_partitions(lambda df: df.assign(yearday=pd.to_datetime(df[['YEAR', 'MONTH', 'DAY']]).dt.dayofyear,
lat=np.deg2rad(df['DECIMALLATITUDE'].values),
lon=np.deg2rad(df['DECIMALLONGITUDE'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8'})
df = df.map_partitions(lambda df :df.assign(x=np.cos(df['lat'].values) * np.cos(df['lon'].values),
y=np.cos(df['lat'].values) * np.sin(df['lon'].values),
z=np.sin(df['lat'].values)),
meta={'YEAR':'i8', 'MONTH':'i8', 'DAY':'i8',
'DECIMALLATITUDE':'f8','DECIMALLONGITUDE':'f8',
'VERNACULARNAME':'object',
'yearday':'i8', 'lat':'f8', 'lon':'f8',
'x':'f8', 'y':'f8', 'z':'f8'})
更新: 我不确定将数据存储为单个压缩文件而不是多个文件是否是个好主意。您考虑过不同的选择吗?
更新 2: 鉴于从度数到弧度的转换是线性的,您可以计算 lon, lat
然后 x,y,z
在 groupby
之后.