将在不同工作机器上创建的数据帧连接成一个数据帧
Concatenating dataframes created on different worker machines into a single dataframe
我正在尝试编写这个非常小的程序,旨在 运行 在 Dask 分布式集群上。在每台 worker 机器上都有一个 CSV 文件(目前只有一个),函数文件应该 运行 每个 worker 和 return 从所述文件构建的数据框。但是,我想将它们作为单个数据框进行分析。我正在尝试从工作人员编辑的那两个 return 构建一个数据框。我怎样才能做到这一点?
def files():
for file in glob.glob("data*.csv"):
df = pd.read_csv(file)
print(df.head())
column = df["cars"]
max = column.max()
print(max)
return df
client = Client('SCHEDULER:8786')
a = client.submit(files, workers='WORKER1:8786', pure=False)
b = client.submit(files, workers='WORKER2:8786', pure=False)
c = dd.merge(a, b, left_on=['id', 'cars'], right_on = ['id', 'cars'], suffixes=['_1', '_2'], how="left")
#c = dd.concat([a,b])
在执行上述代码两次后,每次注释其中一个 C 都会使进程保持打开状态,并且不会 return 编辑或向客户端显示任何内容。
我的示例文件详细如下:
id cars
0 1 223
1 2 333
2 3 1933
3 4 2003
4 5 893
5 6 1233
6 7 1933
id cars
0 14 224
1 24 334
2 34 1934
3 44 3004
4 54 894
5 64 1234
6 74 1934
id cars
0 1 223
1 2 333
2 3 1933
3 4 2003
4 5 893
5 6 1233
6 7 1933
因此我的串联数据框应该有一个 len = len(df_a) + len(df_b)
现在您要求两个工作人员执行相同的任务,即遍历文件并返回第一个文件的数据帧。让我们为此做一个可重现的修复。
这段代码只是为了生成一些虚拟数据:
import pandas as pd
import numpy as np
# let's create two dummy files
for i in range(2):
df = pd.DataFrame(np.random.rand(10,5), columns=list("abcde"))
df.to_csv(f'data{i}.csv', index=False)
现在,对于您要提交的主要任务,每个工作人员都需要加载一个单独的文件。然后,对于连接,您可以依赖 dd.from_delayed
它将连接期货列表中的数据帧:
import dask.dataframe as dd
from dask.distributed import Client
from glob import glob
# start a client
client = Client()
def get_df(file):
df = pd.read_csv(file)
return df
list_files = glob('data*.csv')
# map each item in the list to get_df
futures = client.map(get_df, list_files)
# transform futures into dask dataframe (it concatenates them automatically)
ddf = dd.from_delayed(futures)
更新:抱歉,我误解了你的情况。因此,您有兴趣合并来自不同工作人员的数据帧,因此一种方法是按如下方式修改原始代码:
a = client.submit(files, workers='WORKER1:8786', pure=False)
b = client.submit(files, workers='WORKER2:8786', pure=False)
df_a = dd.from_delayed([a])
df_b = dd.from_delayed([b])
df_c = dd.merge(df_a, df_b, left_on=['id', 'cars'], right_on = ['id', 'cars'], suffixes=['_1', '_2'], how="left")
请注意,在合并之前,您希望将您的期货转换为 dask 数据帧。
我正在尝试编写这个非常小的程序,旨在 运行 在 Dask 分布式集群上。在每台 worker 机器上都有一个 CSV 文件(目前只有一个),函数文件应该 运行 每个 worker 和 return 从所述文件构建的数据框。但是,我想将它们作为单个数据框进行分析。我正在尝试从工作人员编辑的那两个 return 构建一个数据框。我怎样才能做到这一点?
def files():
for file in glob.glob("data*.csv"):
df = pd.read_csv(file)
print(df.head())
column = df["cars"]
max = column.max()
print(max)
return df
client = Client('SCHEDULER:8786')
a = client.submit(files, workers='WORKER1:8786', pure=False)
b = client.submit(files, workers='WORKER2:8786', pure=False)
c = dd.merge(a, b, left_on=['id', 'cars'], right_on = ['id', 'cars'], suffixes=['_1', '_2'], how="left")
#c = dd.concat([a,b])
在执行上述代码两次后,每次注释其中一个 C 都会使进程保持打开状态,并且不会 return 编辑或向客户端显示任何内容。
我的示例文件详细如下:
id cars
0 1 223
1 2 333
2 3 1933
3 4 2003
4 5 893
5 6 1233
6 7 1933
id cars
0 14 224
1 24 334
2 34 1934
3 44 3004
4 54 894
5 64 1234
6 74 1934
id cars
0 1 223
1 2 333
2 3 1933
3 4 2003
4 5 893
5 6 1233
6 7 1933
因此我的串联数据框应该有一个 len = len(df_a) + len(df_b)
现在您要求两个工作人员执行相同的任务,即遍历文件并返回第一个文件的数据帧。让我们为此做一个可重现的修复。
这段代码只是为了生成一些虚拟数据:
import pandas as pd
import numpy as np
# let's create two dummy files
for i in range(2):
df = pd.DataFrame(np.random.rand(10,5), columns=list("abcde"))
df.to_csv(f'data{i}.csv', index=False)
现在,对于您要提交的主要任务,每个工作人员都需要加载一个单独的文件。然后,对于连接,您可以依赖 dd.from_delayed
它将连接期货列表中的数据帧:
import dask.dataframe as dd
from dask.distributed import Client
from glob import glob
# start a client
client = Client()
def get_df(file):
df = pd.read_csv(file)
return df
list_files = glob('data*.csv')
# map each item in the list to get_df
futures = client.map(get_df, list_files)
# transform futures into dask dataframe (it concatenates them automatically)
ddf = dd.from_delayed(futures)
更新:抱歉,我误解了你的情况。因此,您有兴趣合并来自不同工作人员的数据帧,因此一种方法是按如下方式修改原始代码:
a = client.submit(files, workers='WORKER1:8786', pure=False)
b = client.submit(files, workers='WORKER2:8786', pure=False)
df_a = dd.from_delayed([a])
df_b = dd.from_delayed([b])
df_c = dd.merge(df_a, df_b, left_on=['id', 'cars'], right_on = ['id', 'cars'], suffixes=['_1', '_2'], how="left")
请注意,在合并之前,您希望将您的期货转换为 dask 数据帧。