使用 joblib 会使 python 在脚本运行时消耗越来越多的 RAM
Using joblib makes python consume increasing amounts of RAM as the script runs
我有大量文件要加载,进行一些处理,然后存储处理后的数据。为此,我有以下代码:
from os import listdir
from os.path import dirname, abspath, isfile, join
import pandas as pd
import sys
import time
# Multi-threading
from joblib import Parallel, delayed
import multiprocessing
# Number of cores
TOTAL_NUM_CORES = multiprocessing.cpu_count()
# Path of this script's file
FILES_PATH = dirname(abspath(__file__))
def read_and_convert(f,num_files):
# Read the file
dataframe = pd.read_csv(FILES_PATH + '\Tick\' + f, low_memory=False, header=None, names=['Symbol', 'Date_Time', 'Bid', 'Ask'], index_col=1, parse_dates=True)
# Resample the data to have minute-to-minute data, Open-High-Low-Close format.
data_bid = dataframe['Bid'].resample('60S').ohlc()
data_ask = dataframe['Ask'].resample('60S').ohlc()
# Concatenate the OLHC data
data_ask_bid = pd.concat([data_bid, data_ask], axis=1, keys=['Bid', 'Ask'])
# Keep only non-weekend data (from Monday 00:00 until Friday 22:00)
data_ask_bid = data_ask_bid[(((data_ask_bid.index.weekday >= 0) & (data_ask_bid.index.weekday <= 3)) | ((data_ask_bid.index.weekday == 4) & (data_ask_bid.index.hour < 22)))]
# Save the processed and concatenated data of each month in a different folder
data_ask_bid.to_csv(FILES_PATH + '\OHLC\' + f)
print(f)
def main():
start_time = time.time()
# Get the paths for all the tick data files
files_names = [f for f in listdir(FILES_PATH + '\Tick\') if isfile(join(FILES_PATH + '\Tick\', f))]
num_cores = int(TOTAL_NUM_CORES/2)
print('Converting Tick data to OHLC...')
print('Using ' + str(num_cores) + ' cores.')
# Open and convert files in parallel
Parallel(n_jobs=num_cores)(delayed(read_and_convert)(f,len(files_names)) for f in files_names)
# for f in files_names: read_and_convert(f,len(files_names)) # non-parallel
print("\nTook %s seconds." % (time.time() - start_time))
if __name__ == "__main__":
main()
前几个文件以这种方式处理得非常快,但随着脚本处理越来越多的文件,速度开始变得马虎。随着处理的文件越来越多,RAM 会逐渐变满,如下所示。 joblib 不是在循环遍历文件时刷新不需要的数据吗?
将 gc.collect()
添加到您并行 运行 的函数的最后一行可以避免 RAM 饱和。 gc.collect()
是 Python 的垃圾收集器。
我有大量文件要加载,进行一些处理,然后存储处理后的数据。为此,我有以下代码:
from os import listdir
from os.path import dirname, abspath, isfile, join
import pandas as pd
import sys
import time
# Multi-threading
from joblib import Parallel, delayed
import multiprocessing
# Number of cores
TOTAL_NUM_CORES = multiprocessing.cpu_count()
# Path of this script's file
FILES_PATH = dirname(abspath(__file__))
def read_and_convert(f,num_files):
# Read the file
dataframe = pd.read_csv(FILES_PATH + '\Tick\' + f, low_memory=False, header=None, names=['Symbol', 'Date_Time', 'Bid', 'Ask'], index_col=1, parse_dates=True)
# Resample the data to have minute-to-minute data, Open-High-Low-Close format.
data_bid = dataframe['Bid'].resample('60S').ohlc()
data_ask = dataframe['Ask'].resample('60S').ohlc()
# Concatenate the OLHC data
data_ask_bid = pd.concat([data_bid, data_ask], axis=1, keys=['Bid', 'Ask'])
# Keep only non-weekend data (from Monday 00:00 until Friday 22:00)
data_ask_bid = data_ask_bid[(((data_ask_bid.index.weekday >= 0) & (data_ask_bid.index.weekday <= 3)) | ((data_ask_bid.index.weekday == 4) & (data_ask_bid.index.hour < 22)))]
# Save the processed and concatenated data of each month in a different folder
data_ask_bid.to_csv(FILES_PATH + '\OHLC\' + f)
print(f)
def main():
start_time = time.time()
# Get the paths for all the tick data files
files_names = [f for f in listdir(FILES_PATH + '\Tick\') if isfile(join(FILES_PATH + '\Tick\', f))]
num_cores = int(TOTAL_NUM_CORES/2)
print('Converting Tick data to OHLC...')
print('Using ' + str(num_cores) + ' cores.')
# Open and convert files in parallel
Parallel(n_jobs=num_cores)(delayed(read_and_convert)(f,len(files_names)) for f in files_names)
# for f in files_names: read_and_convert(f,len(files_names)) # non-parallel
print("\nTook %s seconds." % (time.time() - start_time))
if __name__ == "__main__":
main()
前几个文件以这种方式处理得非常快,但随着脚本处理越来越多的文件,速度开始变得马虎。随着处理的文件越来越多,RAM 会逐渐变满,如下所示。 joblib 不是在循环遍历文件时刷新不需要的数据吗?
将 gc.collect()
添加到您并行 运行 的函数的最后一行可以避免 RAM 饱和。 gc.collect()
是 Python 的垃圾收集器。