将在不同工作机器上创建的数据帧连接成一个数据帧

Concatenating dataframes created on different worker machines into a single dataframe

我正在尝试编写这个非常小的程序,旨在 运行 在 Dask 分布式集群上。在每台 worker 机器上都有一个 CSV 文件(目前只有一个),函数文件应该 运行 每个 worker 和 return 从所述文件构建的数据框。但是,我想将它们作为单个数据框进行分析。我正在尝试从工作人员编辑的那两个 return 构建一个数据框。我怎样才能做到这一点?

def files():
    for file in glob.glob("data*.csv"):
        df = pd.read_csv(file)
        print(df.head())
        column = df["cars"]
        max = column.max()
        print(max)
        return df


client = Client('SCHEDULER:8786')
a = client.submit(files, workers='WORKER1:8786',  pure=False)
b = client.submit(files, workers='WORKER2:8786', pure=False)
c = dd.merge(a, b, left_on=['id', 'cars'], right_on = ['id', 'cars'],  suffixes=['_1', '_2'], how="left")
#c = dd.concat([a,b])

在执行上述代码两次后,每次注释其中一个 C 都会使进程保持打开状态,并且不会 return 编辑或向客户端显示任何内容。

我的示例文件详细如下:

   id  cars
0   1   223
1   2   333
2   3  1933
3   4  2003
4   5   893
5   6  1233
6   7  1933
   id  cars
0  14   224
1  24   334
2  34  1934
3  44  3004
4  54   894
5  64  1234
6  74  1934

   id  cars
0   1   223
1   2   333
2   3  1933
3   4  2003
4   5   893
5   6  1233
6   7  1933

因此我的串联数据框应该有一个 len = len(df_a) + len(df_b)

现在您要求两个工作人员执行相同的任务,即遍历文件并返回第一个文件的数据帧。让我们为此做一个可重现的修复。

这段代码只是为了生成一些虚拟数据:

import pandas as pd
import numpy as np

# let's create two dummy files
for i in range(2):
    df = pd.DataFrame(np.random.rand(10,5), columns=list("abcde"))
    df.to_csv(f'data{i}.csv', index=False)

现在,对于您要提交的主要任务,每个工作人员都需要加载一个单独的文件。然后,对于连接,您可以依赖 dd.from_delayed 它将连接期货列表中的数据帧:

import dask.dataframe as dd
from dask.distributed import Client
from glob import glob

# start a client
client = Client()

def get_df(file):
    df = pd.read_csv(file)
    return df

list_files = glob('data*.csv')
# map each item in the list to get_df
futures = client.map(get_df, list_files)

# transform futures into dask dataframe (it concatenates them automatically)
ddf = dd.from_delayed(futures)

更新:抱歉,我误解了你的情况。因此,您有兴趣合并来自不同工作人员的数据帧,因此一种方法是按如下方式修改原始代码:

a = client.submit(files, workers='WORKER1:8786',  pure=False)
b = client.submit(files, workers='WORKER2:8786', pure=False)
df_a = dd.from_delayed([a])
df_b = dd.from_delayed([b])
df_c = dd.merge(df_a, df_b, left_on=['id', 'cars'], right_on = ['id', 'cars'],  suffixes=['_1', '_2'], how="left")

请注意,在合并之前,您希望将您的期货转换为 dask 数据帧。