大数据集,ProcessPoolExecutor 问题

Large dataset, ProcessPoolExecutor issues

问题 - ProcessPoolExecutor 没有提高速度。由 tqdm

确认

对 python 的了解足以复制 and/or 编写一个有效的程序。每个文件需要大约 40 秒来加载->过滤->写入。我有大约 6,800 个文件需要处理,并且想要一个更好的版本来使用我所有的处理能力(6 核),我尝试编写那个版本(如下)。所述版本生成,但比我的原始函数稍慢:

from concurrent.futures import ProcessPoolExecutor
from glob import glob
from json import dump
from tqdm import tqdm
from pybufrkit.decoder import Decoder, generate_bufr_message
from pybufrkit.renderer import FlatJsonRenderer

decoder = Decoder()
DIRECTORY = 'C://blah/'
files = glob(DIRECTORY+'*.bufr')
PHI_MAX, PHI_MIN, LAMBDA_MAX, LAMBDA_MIN = x,x,x,x #Integers

def load_decode_filter(file):
    '''`
     Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open(file, 'rb') as ins:
        for bufr_message in generate_bufr_message(
                decoder,ins.read()):
            input_list = FlatJsonRenderer().render(bufr_message)[3][2] #necessary for [mask] to function
            mask = [obj for obj in input_list if ((PHI_MAX > obj[
                12] > PHI_MIN) & (LAMBDA_MAX > obj[13] > LAMBDA_MIN))]
            output_message.extend(mask)
        return output_message

def main(files_in):
    '''
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor(max_workers=6) as executor:
        with tqdm(range(len(files_in)), desc='files loaded',
                  position=0) as progress:
            futures = []
            for file in files_in:
                future = executor.submit(load_decode_filter(file), file)
                future.add_done_callback(lambda p: progress.update())
                futures.append(future)
            results = []
            for future in futures:
                result = future.result()
                results.append(result)
    with open(DIRECTORY+'bufrout.json', 'w', encoding='utf-8') as f_o:
        dump(results, f_o)

if __name__ == '__main__':
    main(files)

我希望至少减少每个文件的处理时间。


更新,结束:
首先,我要感谢所有评论的人以及回答者(我太新了,无法投票).似乎有意义地提高效率的唯一方法是首先从不解码并从原位 bufr 数据中获取我想要的东西,这完全超出了我目前的能力(这是我第一次接触任何类型的代码)。


我计划(目前)运行我的初始版本(f.bufr 输入,f.bufr_.txt 输出),我可以'会将处理过的文件移动到每个“运行”之后的子目录中。一线希望是我已经学到了足够多的知识,我将能够制作一个程序将所有文本输出组合到一个文件中。再次感谢。

Q :
" PROBLEM - ProcessPoolExecutor hasn't increased speed. Confirmed by tqdm "

A :
不,
恕我直言,
你的主要问题不是[=12=的效率]-实例,但是
你的主要问题是选择性能/效率(差不多)anti-patterns,其中Python,Windows O/S 领域中的 Python-sub-processes 越多,您将不得不等待大约 75 小时来收集所有结果(如果 processing-pipeline 确实如此)你期望它做什么,我无法判断,但我猜它不会......出于下面列出的原因)

SUSPECT #1:
最好避免 75 小时产生无意义的输出:

鉴于记录的标准 Py3 concurrent.futures.Executor()-实例 .submit()-方法的 call-signature,您的代码不符合此规范。

作为 calling-side 的 main() 不是传递对函数的引用,而是首先对 6800 个文件中的每一个执行完整的、纯 [SERIAL] METOP-workpackage 处理(产生一些昂贵的收集巨大 list-of-messages ),然后(与记录的传递对函数的引用的要求相反 / in-place lambda-operator )再次在非常巨大的 RAM/CPU/TIME 费用,SER/sent/DES-transferred 到 Executor 管理的 worker-processes 池之一(我怀疑在收到列表后是否能够做任何合理的事情,而不是一个函数(计划在这样的远程进程中执行,传递给它的参数 - 按照 calling-signature 指定)。哎哟...

def main( files_in ):
    '''                                                                 __doc__
    attempt to intiate all cores in loading and filter bufr files
    '''
    with ProcessPoolExecutor( max_workers = 6
                              )  as executor: #---------------------------# eXe CONTEXT-mgr
        
        with tqdm( range( len( files_in ) ),
                   desc     = 'files loaded',
                   position = 0
                   ) as progress: #---------------------------------------# pro CONTEXT-in-CONTEXT-mgr
            
            futures = []
            for file in files_in: #---------------------------------------#     LUXURY of top-level iterator, commanding  6800x times a pool of workers
                future = executor.submit( load_decode_filter( file ), #---#     ??? WHY CALC THE RESULT BEFORE passing it to the .submit() ???
                                                              file    #---#         std PARA
                                                              )
                future.add_done_callback( lambda p: progress.update() )   #     LUXURY of tdqm() for showing 75-hours of work ???
                futures.append( future ) #--------------------------------#     LUXURY of no performance gain
            
            results = []
            for future in futures:
                result = future.result()
                results.append( result ) #--------------------------------#     LUXURY of adverse performance gain
    
    with open( DIRECTORY + 'bufrout.json', 'w',
               encoding = 'utf-8'
               ) as f_o: #------------------------------------------------# f_o CONTEXT-mgr
        dump( results, f_o )

SUSPECT #2 :
最好避免所有 performance-degrading syntax-constructors,
如果性能是要实现的真正目标:

避免输入一种 low-hanging-fruits SLOC-s 的任何和所有罪恶,它看起来“性感”,但已经付出了巨大的 add-on 管理费用。

设计 process-flow 以便我们可以将 End-to-End 处理时间缩短 latency-masking,如果可能(file-I/O 是经典案例)并完全避免任何可减少的步骤(创建 named-variables(有时从未使用过)是类似的罪恶)。

鉴于你 运行 在 Windows O/S 内,你的(尽管隐藏)sub-process-instantiation 成本是所有其他案例中最高的 - Windows O/S 将生成 Python interpreter-process 的完整 top-down 副本以及所有 data-structures 等,因此如果这导致您的物理 RAM 获得“over-crowded",O/S 将开始(在剩下的 75 小时中......)令人讨厌的 war 颠簸 Virtual-Memory-managed file-I/O-传输 ( ~ 10.000x bigger latency ) from-RAM-to-disk & from-disk-to-RAM。这将有效地损害任何其他 CPU-from/to-RAM I/O-operations 我们可能会直接忘记任何关于提高性能的梦想。

根据 pybufrkit 的承诺,还有一次机会 - 获得 10% ~ 30% 的性能提升 - 如果您的“过滤器”可以使用 pybufrkit-templates 编译:

"(...) BUFR Template Compilation
The main purpose of Template Compilation is performance. However since bit operations are the most time consuming part in the overall processing. The performance gain somewhat is limited. Depending on the total number of descriptors to be processed for a message, template compilation provides 10 - 30% performance boost. Read the Docs "

As-was,entropy-reduced代码:

def load_decode_filter( file ):
    '''`
    Phi and Lambda ranges are ~1 degree so this eviscerates 99.7% of each file
    '''
    output_message = []
    with open( file, 'rb' ) as ins: #----------------------------------- # ins CONTEXT-mgr
        for idx,         \
            bufr_message  \
            in             \
            enumerate( generate_bufr_message( decoder,                   #     LUXURY of enumerate for no real use
                                              ins.read() # <-------------# ins.
                                              )
                       ):
            input_list = FlatJsonRenderer().render( bufr_message )[3][2] #     LUXURY of JSON-(re)-)decorations
            mask = [ obj for obj in input_list                           #
                                 if ( (    PHI_MAX > obj[12] >    PHI_MIN )
                                    & ( LAMBDA_MAX > obj[13] > LAMBDA_MIN )
                                      )
                     ]
            output_message.extend( mask )
        return output_message

性能提示,如果既没有设法使用 pybufrkit 本机 compiled-templates 也没有 native-scripted CLI 任务 pybufrkit 并求助于 Win/Py3 处理流程:

  • 考虑到 top-bottom 份 main-Python 翻译过程的全部支付费用,您的员工应该“知道”list-of-all-files,所以这个令人尴尬的独立 file-by-file 进程将尽力做到:

  • gc.collect(); gc.disable() 在产生任何工人池之前

  • 产生的 max_workers worker-processes 与 CPU-RAM 实体 memory-I/O-channels 一样少ware(任务是 memory-bound,不是 CPU )

  • 拆分,在 main() 侧 list-of-files 处理 - 使用 max_workers-many,balanced-length,non-overlapping ( from_fileIDX, to_fileIDX )

    的元组
  • executor.submit() 一个 block-processing function-reference,带有一个 ( from_, to_ ) 的元组,并将所有其余部分安排在这样的 block-processing 函数中,包括结果的 latency-masked file-I/O 存储(可能稍后合并,使用 O/S text/binary-file 合并)

  • 更喜欢 latency-masking 流程,使用 syntax-sugar(ed) 迭代器在 school-book 示例中可能很好,但这里是 ( un-maskable )性能杀手 - 收集 [ obj for obj in ... if ... ] 的 huge-list 永远不会改善 stream-alike(可屏蔽延迟)process-flow,而不先收集这样的 huge-list,只是为了下一个( re)-将这样的 huge-list 到 file-I/O 这样的列表的项目逐一迭代到 disk-file 上。更好的 iterate/filter/conditionally 一次执行 file-I/O-ops,单个 stream-of-steps(减少 RAM,避免 add-on 开销和所有具有可屏蔽延迟的开销)

有关更多详细信息,您可能希望阅读 this and code from this 以及相关示例。