优化大型 excel 文件的处理
Optimizing processing of a large excel file
我正在处理一个大型数据集,我需要在其中读取 excel 个文件,然后找到有效数字,但这项任务只需要 500k 数据就需要大量时间。对于有效号码,我使用 google phonelib。处理可以以异步方式完成,因为它们是独立的。
parts = dask.delayed(pd.read_excel)('500k.xlsx')
data = dd.from_delayed(parts)
data['Valid'] = data['Mobile'].apply(lambda x: phonenumbers.is_valid_number(phonenumbers.parse(x)),meta=('Valid','object'))
用于背景
phonenumbers.is_valid_number(phonenumbers.parse('+442083661177'))
输出为 True
我希望输出少于 10 秒,但它需要大约 40 秒
刚刚玩过这个,您可能只需要 repartition
your dataframe 允许计算 运行 并行
我首先生成一些数据:
import csv
import random
with open('tmp.csv', 'w') as fd:
out = csv.writer(fd)
out.writerow(['id', 'number'])
for i in range(500_000):
a = random.randrange(1000, 2999)
b = random.randrange(100_000, 899_999)
out.writerow([i+1, f'+44 {a} {b}'])
请注意,这些大多是有效的英国号码。
然后我 运行 类似于你的代码:
from dask.distributed import Client
import dask.dataframe as dd
import phonenumbers
def fn(num):
return phonenumbers.is_valid_number(phonenumbers.parse(num))
with Client(processes=True):
df = dd.read_csv('tmp.csv')
# repartition to increase parallelism
df = df.repartition(npartitions=8)
df['valid'] = df.number.apply(fn, meta=('valid', 'object'))
out = df.compute()
这在我的笔记本电脑(4 核,8 线程,Linux 5.2.8)上完成大约需要 20 秒,这只是普通循环性能的两倍多一点。这表明 dask 有相当多的 运行 时间开销,因为我希望它比这快得多。如果我删除对 repartition
的调用,它需要的时间比我愿意等待的时间更长,并且 top
只显示一个进程 运行ning
请注意,如果我重写它来做 multiprocessing
中的天真事情,我会得到更好的结果:
from multiprocessing import Pool
import pandas as pd
df = pd.read_csv('tmp.csv')
with Pool(4) as pool:
df['valid'] = pool.map(fn, df['number'])
这将 运行 时间减少到大约 11 秒,而且这里的代码更少作为奖励
我正在处理一个大型数据集,我需要在其中读取 excel 个文件,然后找到有效数字,但这项任务只需要 500k 数据就需要大量时间。对于有效号码,我使用 google phonelib。处理可以以异步方式完成,因为它们是独立的。
parts = dask.delayed(pd.read_excel)('500k.xlsx')
data = dd.from_delayed(parts)
data['Valid'] = data['Mobile'].apply(lambda x: phonenumbers.is_valid_number(phonenumbers.parse(x)),meta=('Valid','object'))
用于背景
phonenumbers.is_valid_number(phonenumbers.parse('+442083661177'))
输出为 True
我希望输出少于 10 秒,但它需要大约 40 秒
刚刚玩过这个,您可能只需要 repartition
your dataframe 允许计算 运行 并行
我首先生成一些数据:
import csv
import random
with open('tmp.csv', 'w') as fd:
out = csv.writer(fd)
out.writerow(['id', 'number'])
for i in range(500_000):
a = random.randrange(1000, 2999)
b = random.randrange(100_000, 899_999)
out.writerow([i+1, f'+44 {a} {b}'])
请注意,这些大多是有效的英国号码。
然后我 运行 类似于你的代码:
from dask.distributed import Client
import dask.dataframe as dd
import phonenumbers
def fn(num):
return phonenumbers.is_valid_number(phonenumbers.parse(num))
with Client(processes=True):
df = dd.read_csv('tmp.csv')
# repartition to increase parallelism
df = df.repartition(npartitions=8)
df['valid'] = df.number.apply(fn, meta=('valid', 'object'))
out = df.compute()
这在我的笔记本电脑(4 核,8 线程,Linux 5.2.8)上完成大约需要 20 秒,这只是普通循环性能的两倍多一点。这表明 dask 有相当多的 运行 时间开销,因为我希望它比这快得多。如果我删除对 repartition
的调用,它需要的时间比我愿意等待的时间更长,并且 top
只显示一个进程 运行ning
请注意,如果我重写它来做 multiprocessing
中的天真事情,我会得到更好的结果:
from multiprocessing import Pool
import pandas as pd
df = pd.read_csv('tmp.csv')
with Pool(4) as pool:
df['valid'] = pool.map(fn, df['number'])
这将 运行 时间减少到大约 11 秒,而且这里的代码更少作为奖励