我是否正确使用了 python 池化?

Am I using python pooling properly?

我有一个非常简单的 python 脚本,它从列表(6K+ 长)中读取一个股票代码,并获取一些数据来标记交易日的异常交易量。

如果我只是 运行 循环遍历代码文件中的每一行,则需要数小时才能 运行。

基于一些谷歌搜索,我找到了这个多处理的粗略示例,并决定尝试实现它。

当我 运行 脚本时,它 运行 的速度要快得多,但也导致了一些我无法弄清楚的非常奇怪的问题。有时我会收到 redis 断路器错误,有时它会停止并挂在代码文件的末尾附近。

有什么想法吗?

import yfinance as yf
import multiprocessing
import time
import logging

file = open("C:\Users\miner\Desktop\unusual.txt", 'w')


def main():
    read_ticker_file()


def read_ticker_file():
    file1 = open("C:\Users\miner\Desktop\tickers.txt", 'r')
    lines = file1.readlines()

    count = 0

    ticker_arr = []

    for line in lines:
        count += 1
        line = line.strip('\n')
        line = line.strip()
        ticker_arr.append(line)

    return ticker_arr


def get_historical_data(symbol):
    yahoo_ticker = yf.Ticker(symbol)
    historical = yf.download(symbol, period="max", interval="1d")
    average_volume_arr = historical['Volume']
    try:
        current_volume = yahoo_ticker.info['volume']
        sum_volume = 0
        for volume in average_volume_arr:
            sum_volume += volume
        average_volume = sum_volume / len(average_volume_arr)
        if current_volume > average_volume:
            volume_over_average = (current_volume - average_volume) / average_volume
            volume_over_average = "{:.2%}".format(volume_over_average)
            unusual_volume = (symbol + " - " + str(volume_over_average))
            print(unusual_volume)
            write_to_file(unusual_volume)
    except Exception as e:
        print(e)


def write_to_file(data):
    file.writelines(data + "\n")


if __name__ == '__main__':
    # start = time.time()
    inputs = read_ticker_file()

    pool = multiprocessing.Pool(processes=20)
    pool.map(get_historical_data, inputs)
    pool.close()
    pool.join()
    # end = time.time()
    # print(start - end)

正如我在上面的评论中提到的,我认为您没有正确处理 unusual.txt 的输出。以下至少应该通过让您的工作人员函数 return 记录或 None 返回主进程进行写入来纠正该问题。我正在使用方法 imap 而不是 map 以便我可以懒惰地处理 return 值,因为它们是 returned。它们现在也将按照符号在输入文件中出现的顺序排列。如果输入文件有大量符号,我们不应该使用默认的 chunksize 参数,所以我提供了一个函数来计算一个合适的值。

import yfinance as yf
import multiprocessing
import time

def read_ticker_file():
    with open("C:\Users\miner\Desktop\tickers.txt", 'r') as f:
        return [line.strip() for line in f]

def get_historical_data(symbol):
    yahoo_ticker = yf.Ticker(symbol)
    historical = yf.download(symbol, period="max", interval="1d")
    average_volume_arr = historical['Volume']
    try:
        current_volume = yahoo_ticker.info['volume']
        sum_volume = 0
        for volume in average_volume_arr:
            sum_volume += volume
        average_volume = sum_volume / len(average_volume_arr)
        if current_volume > average_volume:
            volume_over_average = (current_volume - average_volume) / average_volume
            volume_over_average = "{:.2%}".format(volume_over_average)
            unusual_volume = (symbol + " - " + str(volume_over_average))
            print(unusual_volume)
            return unusual_volume
        else:
            return None
    except Exception as e:
        print(e)
        return None

def compute_chunksize(iterable_size, pool_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == '__main__':
    # start = time.time()
    inputs = read_ticker_file()
    pool = multiprocessing.Pool(processes=20)
    chunksize = compute_chunksize(len(inputs), 20)
    results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
    with open("C:\Users\miner\Desktop\unusual.txt", 'w') as f:
        for result in results:
            if result:
                print(result, file=f)
    # end = time.time()
    # print(start - end)

另一种方法

同样,除了您写给 unusual.txt 的问题之外,这不一定会解决您的问题,上面的代码也应该处理。但这是我编写解决方案并从那里开始工作的方式:

我不知道文件 tickers.txt 有多大,也不知道 yfinance 包有多大。但似乎很明显,对 yf.download 的调用和对 unusual.txt 的文件写入,我已经在上面的评论中指出了我不相信的是被正确处理的是 I/O 绑定的“进程”,不能由多线程池处理。目前尚不清楚剩下的是什么,即 current_volumeaverage_volume 的计算和比较是 CPU 密集的,足以证明使用多处理来执行这些计算的开销是合理的。

下面将完成所有下载和计算的单个函数 get_historical_data 拆分为两个函数 load_historical_data_and_processprocess_data。创建了一个大型多线程池和多处理池。使用带有函数 imap 的多线程池为 tickers.txt 中的每个符号调用辅助函数 load_historical_data_and_process,这是 [= 的“懒惰”版本15=]。也就是说,如果文件很大,不需要将所有符号读入内存,先构建一个map需要的列表;可以使用生成器函数。即使文件很小,使用 imap 也没有真正的缺点。 load_historical_data_and_process 将进行所有必要的下载。为了进行计算,它将使用通过阻塞方法 apply 传递给它的多线程池来调用辅助函数 process_data。通过 直接调用 函数 process_data 而不是使用多处理池来获得替代计时会很有趣。当然,在这种情况下,由于争夺全局解释器锁,在 process_data 的执行中跨线程实现的并发性非常小。但是,根据 process_data 的执行中涉及多少实际 CPU(我无法知道),CPU 您将不必传递参数和结果而节省进程边界可能会偏移。

import yfinance as yf
from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time

def get_symbols():
    with open("C:\Users\miner\Desktop\tickers.txt", 'r') as file1:
        for line in file1:
            yield line.strip()

def load_historical_data_and_process(multiprocessing_pool, symbol):
    """ What I believe is I/O-intensive and so this runs in a multithreading pool: """
    try:
        historical = yf.download(symbol, period="max", interval="1d")
        yahoo_ticker = yf.Ticker(symbol)
        current_volume = yahoo_ticker.info['volume']
        # To call directly:
        #return process_data(symbol, historical, current_volume)
        return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
    except Exception as e:
        print(e)
        return None


def process_data(symbol, historical, current_volume):
    """ What I believe may warrant running in a multiprocessing pool: """
    average_volume_arr = historical['Volume']
    sum_volume = 0
    for volume in average_volume_arr:
        sum_volume += volume
    average_volume = sum_volume / len(average_volume_arr)
    if current_volume > average_volume:
        volume_over_average = (current_volume - average_volume) / average_volume
        volume_over_average = "{:.2%}".format(volume_over_average)
        unusual_volume_record = (symbol + " - " + str(volume_over_average))
        print(unusual_volume_record, flush=True)
        return unusual_volume_record
    else:
        return None

if __name__ == '__main__':
    # start = time.time()
    # or some suitable thread pool size:
    with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
        # pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
        worker = partial(load_historical_data_and_process, multiprocessing_pool)
        results = thread_pool.imap(worker, get_symbols())
        with open("C:\Users\miner\Desktop\unusual.txt", 'w') as f:
            for result in results:
                if result:
                    print(result, file=f)
    # end = time.time()
    # print(start - end)