使用 Dask 下载、处理并保存为 csv
Using Dask to download, process, and save to csv
问题
我的部分工作流程涉及下载数十万个文件、解析数据,然后在本地保存到 csv。我正在尝试使用 Dask 设置此工作流程,但它似乎没有并行处理。 Dask 仪表板显示每个工人的 cpu% 很低,任务选项卡是空的。状态也不显示任何内容。 htop
似乎不会一次处理超过 1 或 2 个“运行”。我不确定如何从这里开始。
相关:How should I write multiple CSV files efficiently using dask.dataframe?(此问题基于的旧问题)
例子
from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import wget
import zipfile
import multiprocessing
def get_fn(dat):
### Download file and unzip based on input dat
url = f"http://www.urltodownloadfrom.com/{dat['var1']}/{dat['var2']}.csv"
wget.download(url)
indat = unzip()
### Process file
outdat = proc_dat(indat)
### Save file
outdat.to_csv('file_path')
### Trash collection with custom download fn
delete_downloads()
if __name__ == '__main__':
### Dask setup
NCORES = multiprocessing.cpu_count() - 1
client = Client(n_workers=NCORES, threads_per_worker=1)
### Build df of needed dates and variables
beg_dat = "2020-01-01"
end_dat = "2020-01-31"
date_range = pd.date_range(beg_dat, end_dat)
var = ["var1", "var2"]
lst_ = [(x, y) for x in date_range for y in var]
date = [x[0] for x in lst_]
var = [x[1] for x in lst_]
indf = pd.DataFrame({'date': date, 'var': var}).reset_index()
### Group by each row to process
gb = indf.groupby('index')
gb_i = [gb.get_group(x) for x in gb.groups]
### Start dask using delayed
compute([delayed(get_fn)(thisRow) for thisRow in gb_i], scheduler='processes')
仪表板
这一行:
compute([...], scheduler='processes')
您明确使用了一个调度程序 其他 而不是您之前在脚本中设置的分布式调度程序。如果您未在此处指定 scheduler=
,您将使用正确的客户端,因为它已被设置为默认值。您会在仪表板中看到一些东西。
请注意,您可能仍然看不到高 CPU 使用率,因为似乎大部分时间都在等待下载。
问题
我的部分工作流程涉及下载数十万个文件、解析数据,然后在本地保存到 csv。我正在尝试使用 Dask 设置此工作流程,但它似乎没有并行处理。 Dask 仪表板显示每个工人的 cpu% 很低,任务选项卡是空的。状态也不显示任何内容。 htop
似乎不会一次处理超过 1 或 2 个“运行”。我不确定如何从这里开始。
相关:How should I write multiple CSV files efficiently using dask.dataframe?(此问题基于的旧问题)
例子
from dask.delayed import delayed
from dask import compute
from dask.distributed import Client, progress
import pandas as pd
import wget
import zipfile
import multiprocessing
def get_fn(dat):
### Download file and unzip based on input dat
url = f"http://www.urltodownloadfrom.com/{dat['var1']}/{dat['var2']}.csv"
wget.download(url)
indat = unzip()
### Process file
outdat = proc_dat(indat)
### Save file
outdat.to_csv('file_path')
### Trash collection with custom download fn
delete_downloads()
if __name__ == '__main__':
### Dask setup
NCORES = multiprocessing.cpu_count() - 1
client = Client(n_workers=NCORES, threads_per_worker=1)
### Build df of needed dates and variables
beg_dat = "2020-01-01"
end_dat = "2020-01-31"
date_range = pd.date_range(beg_dat, end_dat)
var = ["var1", "var2"]
lst_ = [(x, y) for x in date_range for y in var]
date = [x[0] for x in lst_]
var = [x[1] for x in lst_]
indf = pd.DataFrame({'date': date, 'var': var}).reset_index()
### Group by each row to process
gb = indf.groupby('index')
gb_i = [gb.get_group(x) for x in gb.groups]
### Start dask using delayed
compute([delayed(get_fn)(thisRow) for thisRow in gb_i], scheduler='processes')
仪表板
这一行:
compute([...], scheduler='processes')
您明确使用了一个调度程序 其他 而不是您之前在脚本中设置的分布式调度程序。如果您未在此处指定 scheduler=
,您将使用正确的客户端,因为它已被设置为默认值。您会在仪表板中看到一些东西。
请注意,您可能仍然看不到高 CPU 使用率,因为似乎大部分时间都在等待下载。