在 for 循环中使用预加载数据的线程安全方式

Thread-safe way to use preloaded data in a for-loop

假设我们在一个 for 循环中对大部分相同的基础数据(可变)应用一组(就地)操作。什么是内存高效(和线程安全)的方法?

请注意,基本数据不应在 for 循环中从迭代到迭代进行更改。

示例代码:

假设我们在 data 目录中有一些包含基本数据的 Excel 文件。此外,我们在 some_more_data 目录中还有一些其他数据。我想使用 some_more_data 目录中的文件对从 data 目录中检索到的数据应用操作。之后我想将结果打印到一个新的 pickle 文件中。

import copy
import pickle
import pandas as pd

# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):

   data_dict = dict()
   for dname, dpath in info_dict.items():
       data_dict[dname] = pd.read_excel(dpath, index_col=0)

   return data_dict

# list of data files
data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
raw_data = read_data(data_list)

# list of files used for operation (they, for example, have different indices)
some_more_data= {
    'some_data_a': 'some_more_data/some_data_a.xlsx',
    'some_data_b': 'some_more_data/some_data_b.xlsx'
    }
some_more_data = read_data(some_more_data)

# Apply operation to data (explicitly use a for-loop)
for smd_k, smd_v in some_more_data.items():
   rdata = copy.deepcopy(raw_data)

   rdata['price'] = rdata['price'].reindex(smd_v.index)
   rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)

   with open(f'data/changed_{smd_k}.pkl', 'wb') as handle:
        pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)

我上面例子中的deepcopy操作是线程安全的吗(假设我想使用多线程)?或者我应该在 for 循环中重复从 Excel 加载数据(非常慢)?或者有更好的方法吗?

感谢您的帮助。


生成示例数据帧并将数据保存在 Excel 文件中的代码

注意必须先手动创建目录datasome_more_data

import pandas as pd
import numpy as np


price = pd.DataFrame([[-1.332298,  0.396217,  0.574269, -0.679972, -0.470584,  0.234379],
                      [-0.222567,  0.281202, -0.505856, -1.392477,  0.941539,  0.974867],
                      [-1.139867, -0.458111, -0.999498,  1.920840,  0.478174, -0.315904],
                      [-0.189720, -0.542432, -0.471642,  1.506206, -1.506439,  0.301714]],
                     columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], 
                     index=pd.date_range('2000', freq='D', periods=4))

eps = pd.DataFrame([[-1.91,  1.63,  0.51, -.32, -0.84,  0.37],
                      [-0.56,  0.02, 0.56, 1.77,  0.99,  0.97],
                      [-1.67, -0.41, -0.98,  1.20,  0.74, -0.04],
                      [-0.80, -0.43, -0.12,  1.06, 1.59,  0.34]],
                     columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], 
                     index=pd.date_range('2000', freq='D', periods=4))

some_data_a = pd.DataFrame(np.random.randint(0,100,size=(4, 6)), columns=['IBM', 'MSFT', 'APPL', 'ORCL','FB','TWTR'], index=pd.date_range('2001', freq='D', periods=4))
some_data_b = pd.DataFrame(np.random.randint(0,100,size=(20, 6)), columns=['GM', 'TSLA', 'IBM', 'MSFT', 'APPL', 'ORCL'], index=pd.date_range('2000', freq='D', periods=20))

price.to_excel('data/price.xlsx')
eps.to_excel('data/eps.xlsx')
some_data_a.to_excel('some_more_data/some_data_a.xlsx')
some_data_b.to_excel('some_more_data/some_data_b.xlsx')

创建 raw_data 字典后,我看不到它在哪里被修改(毕竟, 使用 deepcopy 就可以了)。因此,虽然深度复制可变对象不是线程安全的,但这个特定对象在任何时候都不会发生变化。所以我不明白为什么会有问题。但是,如果您没有信心,您总是可以在锁的控制下进行 deepcopy

如果您使用多线程执行此操作,那么使用 threading.Lock 可能不会降低性能,因为深度复制操作都是 CPU 并且您无法实现任何 deepcopy 并行性,因为您的线程已经锁定了该函数的全局解释器锁 (GIL)(它主要是 Python 字节码)。这种额外的锁定只是防止在 deepcopy 操作中间放弃您的时间片到另一个可能开始 deepcopy 操作的线程(但我仍然认为这不是问题) .但是,如果您使用多线程,那么执行并发 I/O 操作会带来什么样的性能提升?根据您拥有的是硬盘驱动器还是固态驱动器以及该驱动器的特性,并发性甚至可能会损害您的 I/O 性能。如果 Pandas 操作发布了 GIL,您可能会从中获得一些性能改进。

多处理确实提供 CPU 密集型函数的真正并行性,在创建进程和将数据从一个地址 space 传递到另一个地址(即一个进程)时有其自身的开销给另一个)。您在串行处理中没有的这种额外开销必须通过并行计算所实现的节省来补偿。从您所展示的内容中不清楚,如果这确实代表了您的实际情况,那么您会从这种并行性中获得任何好处。但是,当然,您不必担心 deepcopy 的线程安全性,因为一旦每个进程都有 raw_data 的副本,该进程将是 运行 一个单独的线程及其自己的记忆副本完全相互隔离。

总结

  1. 一般来说,deepcopy 对于可变对象来说不是线程安全的,但是由于您的对象似乎没有“变化”,所以这应该不是问题。但是,如果 运行 在多线程下,您可以在 multithreading.Lock 的控制下将 deepcopy 操作作为原子操作来执行,而不会显着降低性能。

  2. 如果您正在使用多处理,并且假设 raw_data 没有在共享内存中实现,那么每个进程将在其自己的 raw_data 副本上开始工作.因此,即使另一个进程正在“变异”raw_data,只要任何一个进程是运行单线程,就无需担心deepcopy.[=的线程安全。 29=]

  3. 根据我看到的代码,尚不清楚多线程或多处理是否会实现任何性能改进。

基准

这对串行、多线程和多处理进行了基准测试。也许每个字典中只有 2 个键,这不是一个现实的例子,但它给出了一个总体思路:

import copy
import pickle
import pandas as pd
import time
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import cpu_count


# Excel import function to obtain dictionary of pandas DataFrames.
def read_data(info_dict):

   data_dict = dict()
   for dname, dpath in info_dict.items():
       data_dict[dname] = pd.read_excel(dpath, index_col=0)

   return data_dict

def serial(raw_data, some_more_data, suffix):
    # Apply operation to data (explicitly use a for-loop)
    for smd_k, smd_v in some_more_data.items():
       rdata = copy.deepcopy(raw_data)

       rdata['price'] = rdata['price'].reindex(smd_v.index)
       rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)

       with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
            pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)

def init_pool(r_d, sfx):
    global raw_data, suffix
    raw_data = r_d
    suffix = sfx

def worker(smd_k, smd_v):
    rdata = copy.deepcopy(raw_data)

    rdata['price'] = rdata['price'].reindex(smd_v.index)
    rdata['eps'] = rdata['eps'].reindex(columns=smd_v.columns)

    with open(f'data/changed_{smd_k}_{suffix}.pkl', 'wb') as handle:
         pickle.dump(rdata, handle, protocol=pickle.HIGHEST_PROTOCOL)

def benchmark1(raw_data, some_more_data):
    start_time = time.time()
    serial(raw_data, some_more_data, '1')
    elapsed = time.time() - start_time
    print('Serial time:', elapsed)

def benchmark2(raw_data, some_more_data):
    start_time = time.time()
    items = list(some_more_data.items())
    pool_size = len(items)
    pool = ThreadPool(pool_size, initializer=init_pool, initargs=(raw_data, '2'))
    pool.starmap(worker, items)
    elapsed = time.time() - start_time
    print('Multithreading time:', elapsed)
    pool.close()
    pool.join()

def benchmark3(raw_data, some_more_data):
    start_time = time.time()
    items = list(some_more_data.items())
    pool_size = min(len(items), cpu_count())
    pool = Pool(pool_size, initializer=init_pool, initargs=(raw_data, '3'))
    pool.starmap(worker, items)
    elapsed = time.time() - start_time
    print('Multiprocessing time:', elapsed)
    pool.close()
    pool.join()

def main():
# list of data files
    data_list = {'price': 'data/price.xlsx', 'eps': 'data/eps.xlsx'}
    raw_data = read_data(data_list)

    # list of files used for operation (they, for example, have different indices)
    some_more_data= {
        'some_data_a': 'some_more_data/some_data_a.xlsx',
        'some_data_b': 'some_more_data/some_data_b.xlsx'
        }
    some_more_data = read_data(some_more_data)

    benchmark1(raw_data, some_more_data)
    benchmark2(raw_data, some_more_data)
    benchmark3(raw_data, some_more_data)

if __name__ == '__main__':
    main()

打印:

Serial time: 0.002997159957885742
Multithreading time: 0.013999462127685547
Multiprocessing time: 0.7790002822875977

跟进我自己的评论并借鉴@booboo 的优秀示例。

尝试更改这两行,而不是

rdata = copy.deepcopy(raw_data)

使用

rdata = raw_data.copy()

文档说 pandas.dataFrame.copy() 创建数据框的深层副本。如果数据框中有对象,pandas 使用这些对象的引用。因为您的数据不包含任何对象,所以不会使用任何引用。

在我看来,pandas.dataFrame.copy() 提供的性能略高于标准库。

pandas.dataFrame.copy() 它打印

Serial time: 0.0017809867858886719
Multithreading time: 0.009797096252441406
Multiprocessing time: 0.3455531597137451

使用 copy.deepcopy(raw_data) 它打印

Serial time: 0.0018610954284667969
Multithreading time: 0.01049661636352539
Multiprocessing time: 0.34659790992736816

差别不大,但速度快了一点。由于您已经在使用 pandas,因此它可以少导入一个库,并且在内存使用方面也可能比标准库更高效。对于制作 dataFrames 的深拷贝,只要 dataFrames 用于存储数据而不是存储对象,它比系统库稍微好一些。