Python 个具有 Pandas 个 Dataframe 的线程不会提高性能
Python Threads with Pandas Dataframe does not improve performance
我有一个 20 万行的 Dataframe,我想分成几个部分并为每个分区调用我的函数 S_Function。
def S_Function(df):
#mycode here
return new_df
主程序
N_Threads = 10
Threads = []
Out = []
size = df.shape[0] // N_Threads
for i in range(N_Threads + 1):
begin = i * size
end = min(df.shape[0], (i+1)*size)
Threads.append(Thread(target = S_Function, args = (df[begin:end])) )
我 运行 线程并进行连接 :
for i in range(N_Threads + 1):
Threads[i].start()
for i in range(N_Threads + 1):
Out.append(Threads[i].join())
output = pd.concat(Out)
代码运行良好,但问题是使用 threading.Thread
并没有减少执行时间。
连续代码:16分钟
并行代码:15 分钟
谁能解释一下要改进什么,为什么效果不佳?
当您必须处理 CPU-bound 操作时,不要使用 threading
。为了实现你的目标,我认为你应该使用 multiprocessing
模块
尝试:
import pandas as pd
import numpy as np
import multiprocessing
import time
import functools
# Modify here
CHUNKSIZE = 20000
def S_Function(df, dictionnary):
# do stuff here
new_df = df
return new_df
if __name__ == '__main__':
# Load your dataframe
df = pd.DataFrame({'A': np.random.randint(1, 30000000, 200000).tolist()})
# Create chunks to process
chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
dictionnary = {'k1': 'v1', 'k2': 'v2'}
s_func = functools.partial(S_Function, dictionnary=dictionnary)
start = time.time()
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.map(s_func, chunks)
out = pd.concat(data)
end = time.time()
print(f"Elapsed time: {end - start:.2f} seconds")
我有一个 20 万行的 Dataframe,我想分成几个部分并为每个分区调用我的函数 S_Function。
def S_Function(df):
#mycode here
return new_df
主程序
N_Threads = 10
Threads = []
Out = []
size = df.shape[0] // N_Threads
for i in range(N_Threads + 1):
begin = i * size
end = min(df.shape[0], (i+1)*size)
Threads.append(Thread(target = S_Function, args = (df[begin:end])) )
我 运行 线程并进行连接 :
for i in range(N_Threads + 1):
Threads[i].start()
for i in range(N_Threads + 1):
Out.append(Threads[i].join())
output = pd.concat(Out)
代码运行良好,但问题是使用 threading.Thread
并没有减少执行时间。
连续代码:16分钟
并行代码:15 分钟
谁能解释一下要改进什么,为什么效果不佳?
当您必须处理 CPU-bound 操作时,不要使用 threading
。为了实现你的目标,我认为你应该使用 multiprocessing
模块
尝试:
import pandas as pd
import numpy as np
import multiprocessing
import time
import functools
# Modify here
CHUNKSIZE = 20000
def S_Function(df, dictionnary):
# do stuff here
new_df = df
return new_df
if __name__ == '__main__':
# Load your dataframe
df = pd.DataFrame({'A': np.random.randint(1, 30000000, 200000).tolist()})
# Create chunks to process
chunks = (df[i:i+CHUNKSIZE] for i in range(0, len(df), CHUNKSIZE))
dictionnary = {'k1': 'v1', 'k2': 'v2'}
s_func = functools.partial(S_Function, dictionnary=dictionnary)
start = time.time()
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
data = pool.map(s_func, chunks)
out = pd.concat(data)
end = time.time()
print(f"Elapsed time: {end - start:.2f} seconds")