为什么这个函数没有并行?
Why is this function not paralleled?
我有一个数据框 df2
,它是 df
的副本。对于列 col_2
中的每个唯一值 c。我想随机提取 2 行,其在 col_2
中的对应值为 c。如果可用行数小于 2,那么我将提取所有行。然后我在 batch
.
列中将选定的行标记为 1 到 2
您能解释一下为什么我的函数没有对列表 ['a', 'b', 'c']
中的所有值执行这项工作吗?例如,我观察
这意味着值 b
和 c
未由函数实现。
import pandas as pd
import os
from multiprocessing import dummy
from random import sample
core = os.cpu_count()
P = dummy.Pool(processes = core)
data = np.array([(3, 'a'), (2, 'a'), (1, 'b'), (0, 'c'), (2, 'c'), (3, 'c')],
dtype=[('col_1', 'i4'), ('col_2', 'U1')])
df = pd.DataFrame.from_records(data)
df['batch'] = 0
df2 = df.copy()
def func(c):
idx = df.col_2 == c
pop = list(df[idx].index)
m = min(2, len(pop))
r = list(sample(pop, m))
df2.loc[r, 'batch'] = list(range(1, m + 1, 1))
P.map(func, ['a', 'b', 'c'])
df2
我不确定 multiprocessing
是正确答案。保存下面的代码并执行它。我创建了一个包含 40,000,000 条记录和 2500 个组的 DataFrame。在此代码中,您有 2 个用于多处理和单处理的实现。
输出:
Dataframe: 40000000 records for 2500 groups
[MP] Elapsed time: 5.66 seconds
[SP] Elapsed time: 4.48 seconds
import pandas as pd
import numpy as np
import multiprocessing
import time
def func_mp(col, df):
print(f"Group: {col} ({len(df)} records)")
out = df.sample(n=2) if len(df) >= 2 else df
out['batch'] = np.arange(0, len(out))
return out
def func_sp(df):
print(f"Group: {df.name} ({len(df)} records)")
out = df.sample(n=2) if len(df) >= 2 else df
out['batch'] = np.arange(0, len(out))
return out
if __name__ == '__main__':
N = 40000000
col_1 = np.random.randint(1, 1000, N)
col_2 = np.random.randint(0, 2500, N)
df = pd.DataFrame({'col_1': col_1, 'col_2': col_2})
start = time.time()
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.starmap(func_mp, df.groupby('col_2'))
out1 = pd.concat(data)
end = time.time()
timemp = end - start
start = time.time()
out2 = df.groupby('col_2', as_index=False).apply(func_sp)
end = time.time()
timesp = end - start
print()
print(f"Dataframe: {len(df)} records for {len(df['col_2'].unique())} groups")
print(f"[MP] Elapsed time: {timemp:.2f} seconds")
print(f"[SP] Elapsed time: {timesp:.2f} seconds")
我有一个数据框 df2
,它是 df
的副本。对于列 col_2
中的每个唯一值 c。我想随机提取 2 行,其在 col_2
中的对应值为 c。如果可用行数小于 2,那么我将提取所有行。然后我在 batch
.
您能解释一下为什么我的函数没有对列表 ['a', 'b', 'c']
中的所有值执行这项工作吗?例如,我观察
这意味着值 b
和 c
未由函数实现。
import pandas as pd
import os
from multiprocessing import dummy
from random import sample
core = os.cpu_count()
P = dummy.Pool(processes = core)
data = np.array([(3, 'a'), (2, 'a'), (1, 'b'), (0, 'c'), (2, 'c'), (3, 'c')],
dtype=[('col_1', 'i4'), ('col_2', 'U1')])
df = pd.DataFrame.from_records(data)
df['batch'] = 0
df2 = df.copy()
def func(c):
idx = df.col_2 == c
pop = list(df[idx].index)
m = min(2, len(pop))
r = list(sample(pop, m))
df2.loc[r, 'batch'] = list(range(1, m + 1, 1))
P.map(func, ['a', 'b', 'c'])
df2
我不确定 multiprocessing
是正确答案。保存下面的代码并执行它。我创建了一个包含 40,000,000 条记录和 2500 个组的 DataFrame。在此代码中,您有 2 个用于多处理和单处理的实现。
输出:
Dataframe: 40000000 records for 2500 groups
[MP] Elapsed time: 5.66 seconds
[SP] Elapsed time: 4.48 seconds
import pandas as pd
import numpy as np
import multiprocessing
import time
def func_mp(col, df):
print(f"Group: {col} ({len(df)} records)")
out = df.sample(n=2) if len(df) >= 2 else df
out['batch'] = np.arange(0, len(out))
return out
def func_sp(df):
print(f"Group: {df.name} ({len(df)} records)")
out = df.sample(n=2) if len(df) >= 2 else df
out['batch'] = np.arange(0, len(out))
return out
if __name__ == '__main__':
N = 40000000
col_1 = np.random.randint(1, 1000, N)
col_2 = np.random.randint(0, 2500, N)
df = pd.DataFrame({'col_1': col_1, 'col_2': col_2})
start = time.time()
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.starmap(func_mp, df.groupby('col_2'))
out1 = pd.concat(data)
end = time.time()
timemp = end - start
start = time.time()
out2 = df.groupby('col_2', as_index=False).apply(func_sp)
end = time.time()
timesp = end - start
print()
print(f"Dataframe: {len(df)} records for {len(df['col_2'].unique())} groups")
print(f"[MP] Elapsed time: {timemp:.2f} seconds")
print(f"[SP] Elapsed time: {timesp:.2f} seconds")