读取进程并与 dask 并行连接 pandas 数据帧
read process and concatenate pandas dataframe in parallel with dask
我正在尝试并行读取和处理 csv 文件列表,并将输出连接成一个pandas dataframe
以供进一步处理。
我的工作流程包括 3 个步骤:
通过读取 csv 文件列表(全部具有相同结构)创建一系列 pandas 数据框
def loadcsv(filename):
df = pd.read_csv(filename)
return df
通过处理 2 个现有列为每个数据框创建一个新列
def makegeom(a,b):
return 'Point(%s %s)' % (a,b)
def applygeom(df):
df['Geom']= df.apply(lambda row: makegeom(row['Easting'],
row['Northing']),
axis=1)
return df
将所有数据帧连接到一个数据帧中
frames = []
for i in csvtest:
df = applygeom(loadcsv(i))
frames.append(df)
mergedresult1 = pd.concat(frames)
在我的工作流程中,我使用 pandas(每个 csv (15) 文件有超过 >> 2*10^6 个数据点)所以需要一段时间才能完成。我认为这种工作流程应该利用一些并行处理(至少对于 read_csv
和 apply
步骤)所以我尝试了 dask,但我无法正确使用它。在我的尝试中,我的速度没有任何提高。
我做了一个简单的笔记本来复制我正在做的事情:
https://gist.github.com/epifanio/72a48ca970a4291b293851ad29eadb50
我的问题是......使用 dask 完成我的用例的正确方法是什么?
Pandas
在Pandas中我会使用apply方法
In [1]: import pandas as pd
In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})
In [3]: def makegeom(row):
...: a, b = row
...: return 'Point(%s %s)' % (a, b)
...:
In [4]: df.apply(makegeom, axis=1)
Out[4]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
dtype: object
Dask.dataframe
在dask.dataframe中你可以做同样的事情
In [5]: import dask.dataframe as dd
In [6]: ddf = dd.from_pandas(df, npartitions=2)
In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
添加新系列
无论哪种情况,您都可以将新系列添加到数据框中
df['geom'] = df[['a', 'b']].apply(makegeom)
创建
如果您有 CSV 数据,那么我会使用 dask.dataframe.read_csv 函数
ddf = dd.read_csv('filenames.*.csv')
如果您有其他类型的数据,那么我会使用 dask.delayed
与此同时,我发现了其他方法(替代 Dask),在我看来相对更容易,可以在 pandas 数据帧上并行执行函数 func
。在这两种情况下,我都利用了 numpy.array_split
方法。
一个使用 python multiprocessing.Pool
、numpy.array_split
和 pandas.concat
的组合,将以这种方式工作:
import numpy as np
def func(array):
# do some computation on the given array
pass
def parallelize_dataframe(df, func, n_cores=72):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
另一个是使用功能强大但简单的 ray
集群(如果您可以 运行 多台机器上的代码,这将非常有用):
# connect to a ray cluster
#
import ray
ray.init(address="auto", redis_password="5241590000000000")
import numpy as np
@ray.remote
def func(df):
# do some computation on the given dataframe
pass
df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))
上述方法对于简单的方法func
非常有效,其中计算可以使用 numpy 进行,返回的产品可以连接回 pandas 数据框 - 对于执行更简单文件操作的方法我也发现有用 parmap.map
- 但对于这个 S.O 来说是 off-topic。问题。
我正在尝试并行读取和处理 csv 文件列表,并将输出连接成一个pandas dataframe
以供进一步处理。
我的工作流程包括 3 个步骤:
通过读取 csv 文件列表(全部具有相同结构)创建一系列 pandas 数据框
def loadcsv(filename): df = pd.read_csv(filename) return df
通过处理 2 个现有列为每个数据框创建一个新列
def makegeom(a,b): return 'Point(%s %s)' % (a,b)
def applygeom(df): df['Geom']= df.apply(lambda row: makegeom(row['Easting'], row['Northing']), axis=1) return df
将所有数据帧连接到一个数据帧中
frames = [] for i in csvtest: df = applygeom(loadcsv(i)) frames.append(df) mergedresult1 = pd.concat(frames)
在我的工作流程中,我使用 pandas(每个 csv (15) 文件有超过 >> 2*10^6 个数据点)所以需要一段时间才能完成。我认为这种工作流程应该利用一些并行处理(至少对于 read_csv
和 apply
步骤)所以我尝试了 dask,但我无法正确使用它。在我的尝试中,我的速度没有任何提高。
我做了一个简单的笔记本来复制我正在做的事情:
https://gist.github.com/epifanio/72a48ca970a4291b293851ad29eadb50
我的问题是......使用 dask 完成我的用例的正确方法是什么?
Pandas
在Pandas中我会使用apply方法
In [1]: import pandas as pd
In [2]: df = pd.DataFrame({'a': [1, 2, 3], 'b': [3, 2, 1]})
In [3]: def makegeom(row):
...: a, b = row
...: return 'Point(%s %s)' % (a, b)
...:
In [4]: df.apply(makegeom, axis=1)
Out[4]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
dtype: object
Dask.dataframe
在dask.dataframe中你可以做同样的事情
In [5]: import dask.dataframe as dd
In [6]: ddf = dd.from_pandas(df, npartitions=2)
In [7]: ddf.apply(makegeom, axis=1).compute()
Out[7]:
0 Point(1 3)
1 Point(2 2)
2 Point(3 1)
添加新系列
无论哪种情况,您都可以将新系列添加到数据框中
df['geom'] = df[['a', 'b']].apply(makegeom)
创建
如果您有 CSV 数据,那么我会使用 dask.dataframe.read_csv 函数
ddf = dd.read_csv('filenames.*.csv')
如果您有其他类型的数据,那么我会使用 dask.delayed
与此同时,我发现了其他方法(替代 Dask),在我看来相对更容易,可以在 pandas 数据帧上并行执行函数 func
。在这两种情况下,我都利用了 numpy.array_split
方法。
一个使用 python multiprocessing.Pool
、numpy.array_split
和 pandas.concat
的组合,将以这种方式工作:
import numpy as np
def func(array):
# do some computation on the given array
pass
def parallelize_dataframe(df, func, n_cores=72):
df_split = np.array_split(df, n_cores)
pool = Pool(n_cores)
df = pd.concat(pool.map(func, df_split))
pool.close()
pool.join()
return df
另一个是使用功能强大但简单的 ray
集群(如果您可以 运行 多台机器上的代码,这将非常有用):
# connect to a ray cluster
#
import ray
ray.init(address="auto", redis_password="5241590000000000")
import numpy as np
@ray.remote
def func(df):
# do some computation on the given dataframe
pass
df_split = np.array_split(df, 288)
result = pd.concat(ray.get([func.remote(i) for i in df_split]))
上述方法对于简单的方法func
非常有效,其中计算可以使用 numpy 进行,返回的产品可以连接回 pandas 数据框 - 对于执行更简单文件操作的方法我也发现有用 parmap.map
- 但对于这个 S.O 来说是 off-topic。问题。