我是否正确使用了 python 池化?
Am I using python pooling properly?
我有一个非常简单的 python 脚本,它从列表(6K+ 长)中读取一个股票代码,并获取一些数据来标记交易日的异常交易量。
如果我只是 运行 循环遍历代码文件中的每一行,则需要数小时才能 运行。
基于一些谷歌搜索,我找到了这个多处理的粗略示例,并决定尝试实现它。
当我 运行 脚本时,它 运行 的速度要快得多,但也导致了一些我无法弄清楚的非常奇怪的问题。有时我会收到 redis 断路器错误,有时它会停止并挂在代码文件的末尾附近。
有什么想法吗?
import yfinance as yf
import multiprocessing
import time
import logging
file = open("C:\Users\miner\Desktop\unusual.txt", 'w')
def main():
read_ticker_file()
def read_ticker_file():
file1 = open("C:\Users\miner\Desktop\tickers.txt", 'r')
lines = file1.readlines()
count = 0
ticker_arr = []
for line in lines:
count += 1
line = line.strip('\n')
line = line.strip()
ticker_arr.append(line)
return ticker_arr
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
write_to_file(unusual_volume)
except Exception as e:
print(e)
def write_to_file(data):
file.writelines(data + "\n")
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
pool.map(get_historical_data, inputs)
pool.close()
pool.join()
# end = time.time()
# print(start - end)
正如我在上面的评论中提到的,我认为您没有正确处理 unusual.txt
的输出。以下至少应该通过让您的工作人员函数 return 记录或 None
返回主进程进行写入来纠正该问题。我正在使用方法 imap
而不是 map
以便我可以懒惰地处理 return 值,因为它们是 returned。它们现在也将按照符号在输入文件中出现的顺序排列。如果输入文件有大量符号,我们不应该使用默认的 chunksize 参数,所以我提供了一个函数来计算一个合适的值。
import yfinance as yf
import multiprocessing
import time
def read_ticker_file():
with open("C:\Users\miner\Desktop\tickers.txt", 'r') as f:
return [line.strip() for line in f]
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
return unusual_volume
else:
return None
except Exception as e:
print(e)
return None
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
chunksize = compute_chunksize(len(inputs), 20)
results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
with open("C:\Users\miner\Desktop\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)
另一种方法
同样,除了您写给 unusual.txt 的问题之外,这不一定会解决您的问题,上面的代码也应该处理。但这是我编写解决方案并从那里开始工作的方式:
我不知道文件 tickers.txt 有多大,也不知道 yfinance
包有多大。但似乎很明显,对 yf.download
的调用和对 unusual.txt 的文件写入,我已经在上面的评论中指出了我不相信的是被正确处理的是 I/O 绑定的“进程”,不能由多线程池处理。目前尚不清楚剩下的是什么,即 current_volume
与 average_volume
的计算和比较是 CPU 密集的,足以证明使用多处理来执行这些计算的开销是合理的。
下面将完成所有下载和计算的单个函数 get_historical_data
拆分为两个函数 load_historical_data_and_process
和 process_data
。创建了一个大型多线程池和多处理池。使用带有函数 imap
的多线程池为 tickers.txt 中的每个符号调用辅助函数 load_historical_data_and_process
,这是 [= 的“懒惰”版本15=]。也就是说,如果文件很大,不需要将所有符号读入内存,先构建一个map
需要的列表;可以使用生成器函数。即使文件很小,使用 imap
也没有真正的缺点。 load_historical_data_and_process
将进行所有必要的下载。为了进行计算,它将使用通过阻塞方法 apply
传递给它的多线程池来调用辅助函数 process_data
。通过 直接调用 函数 process_data
而不是使用多处理池来获得替代计时会很有趣。当然,在这种情况下,由于争夺全局解释器锁,在 process_data
的执行中跨线程实现的并发性非常小。但是,根据 process_data
的执行中涉及多少实际 CPU(我无法知道),CPU 您将不必传递参数和结果而节省进程边界可能会偏移。
import yfinance as yf
from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time
def get_symbols():
with open("C:\Users\miner\Desktop\tickers.txt", 'r') as file1:
for line in file1:
yield line.strip()
def load_historical_data_and_process(multiprocessing_pool, symbol):
""" What I believe is I/O-intensive and so this runs in a multithreading pool: """
try:
historical = yf.download(symbol, period="max", interval="1d")
yahoo_ticker = yf.Ticker(symbol)
current_volume = yahoo_ticker.info['volume']
# To call directly:
#return process_data(symbol, historical, current_volume)
return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
except Exception as e:
print(e)
return None
def process_data(symbol, historical, current_volume):
""" What I believe may warrant running in a multiprocessing pool: """
average_volume_arr = historical['Volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume_record = (symbol + " - " + str(volume_over_average))
print(unusual_volume_record, flush=True)
return unusual_volume_record
else:
return None
if __name__ == '__main__':
# start = time.time()
# or some suitable thread pool size:
with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
# pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
worker = partial(load_historical_data_and_process, multiprocessing_pool)
results = thread_pool.imap(worker, get_symbols())
with open("C:\Users\miner\Desktop\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)
我有一个非常简单的 python 脚本,它从列表(6K+ 长)中读取一个股票代码,并获取一些数据来标记交易日的异常交易量。
如果我只是 运行 循环遍历代码文件中的每一行,则需要数小时才能 运行。
基于一些谷歌搜索,我找到了这个多处理的粗略示例,并决定尝试实现它。
当我 运行 脚本时,它 运行 的速度要快得多,但也导致了一些我无法弄清楚的非常奇怪的问题。有时我会收到 redis 断路器错误,有时它会停止并挂在代码文件的末尾附近。
有什么想法吗?
import yfinance as yf
import multiprocessing
import time
import logging
file = open("C:\Users\miner\Desktop\unusual.txt", 'w')
def main():
read_ticker_file()
def read_ticker_file():
file1 = open("C:\Users\miner\Desktop\tickers.txt", 'r')
lines = file1.readlines()
count = 0
ticker_arr = []
for line in lines:
count += 1
line = line.strip('\n')
line = line.strip()
ticker_arr.append(line)
return ticker_arr
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
write_to_file(unusual_volume)
except Exception as e:
print(e)
def write_to_file(data):
file.writelines(data + "\n")
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
pool.map(get_historical_data, inputs)
pool.close()
pool.join()
# end = time.time()
# print(start - end)
正如我在上面的评论中提到的,我认为您没有正确处理 unusual.txt
的输出。以下至少应该通过让您的工作人员函数 return 记录或 None
返回主进程进行写入来纠正该问题。我正在使用方法 imap
而不是 map
以便我可以懒惰地处理 return 值,因为它们是 returned。它们现在也将按照符号在输入文件中出现的顺序排列。如果输入文件有大量符号,我们不应该使用默认的 chunksize 参数,所以我提供了一个函数来计算一个合适的值。
import yfinance as yf
import multiprocessing
import time
def read_ticker_file():
with open("C:\Users\miner\Desktop\tickers.txt", 'r') as f:
return [line.strip() for line in f]
def get_historical_data(symbol):
yahoo_ticker = yf.Ticker(symbol)
historical = yf.download(symbol, period="max", interval="1d")
average_volume_arr = historical['Volume']
try:
current_volume = yahoo_ticker.info['volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume = (symbol + " - " + str(volume_over_average))
print(unusual_volume)
return unusual_volume
else:
return None
except Exception as e:
print(e)
return None
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
if __name__ == '__main__':
# start = time.time()
inputs = read_ticker_file()
pool = multiprocessing.Pool(processes=20)
chunksize = compute_chunksize(len(inputs), 20)
results = pool.imap(get_historical_data, inputs, chunskize=chunksize)
with open("C:\Users\miner\Desktop\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)
另一种方法
同样,除了您写给 unusual.txt 的问题之外,这不一定会解决您的问题,上面的代码也应该处理。但这是我编写解决方案并从那里开始工作的方式:
我不知道文件 tickers.txt 有多大,也不知道 yfinance
包有多大。但似乎很明显,对 yf.download
的调用和对 unusual.txt 的文件写入,我已经在上面的评论中指出了我不相信的是被正确处理的是 I/O 绑定的“进程”,不能由多线程池处理。目前尚不清楚剩下的是什么,即 current_volume
与 average_volume
的计算和比较是 CPU 密集的,足以证明使用多处理来执行这些计算的开销是合理的。
下面将完成所有下载和计算的单个函数 get_historical_data
拆分为两个函数 load_historical_data_and_process
和 process_data
。创建了一个大型多线程池和多处理池。使用带有函数 imap
的多线程池为 tickers.txt 中的每个符号调用辅助函数 load_historical_data_and_process
,这是 [= 的“懒惰”版本15=]。也就是说,如果文件很大,不需要将所有符号读入内存,先构建一个map
需要的列表;可以使用生成器函数。即使文件很小,使用 imap
也没有真正的缺点。 load_historical_data_and_process
将进行所有必要的下载。为了进行计算,它将使用通过阻塞方法 apply
传递给它的多线程池来调用辅助函数 process_data
。通过 直接调用 函数 process_data
而不是使用多处理池来获得替代计时会很有趣。当然,在这种情况下,由于争夺全局解释器锁,在 process_data
的执行中跨线程实现的并发性非常小。但是,根据 process_data
的执行中涉及多少实际 CPU(我无法知道),CPU 您将不必传递参数和结果而节省进程边界可能会偏移。
import yfinance as yf
from multiprocessing.pool import ThreadPool, Pool
from functools import partial
import time
def get_symbols():
with open("C:\Users\miner\Desktop\tickers.txt", 'r') as file1:
for line in file1:
yield line.strip()
def load_historical_data_and_process(multiprocessing_pool, symbol):
""" What I believe is I/O-intensive and so this runs in a multithreading pool: """
try:
historical = yf.download(symbol, period="max", interval="1d")
yahoo_ticker = yf.Ticker(symbol)
current_volume = yahoo_ticker.info['volume']
# To call directly:
#return process_data(symbol, historical, current_volume)
return multiprocessing_pool.apply(process_data, args=(symbol, historical, current_volume))
except Exception as e:
print(e)
return None
def process_data(symbol, historical, current_volume):
""" What I believe may warrant running in a multiprocessing pool: """
average_volume_arr = historical['Volume']
sum_volume = 0
for volume in average_volume_arr:
sum_volume += volume
average_volume = sum_volume / len(average_volume_arr)
if current_volume > average_volume:
volume_over_average = (current_volume - average_volume) / average_volume
volume_over_average = "{:.2%}".format(volume_over_average)
unusual_volume_record = (symbol + " - " + str(volume_over_average))
print(unusual_volume_record, flush=True)
return unusual_volume_record
else:
return None
if __name__ == '__main__':
# start = time.time()
# or some suitable thread pool size:
with Pool(processes=20) as multiprocessing_pool, ThreadPool(processes=100) as thread_pool:
# pass multiprocessing pool to thread pool worker get_historical_data for CPU-intensive processing
worker = partial(load_historical_data_and_process, multiprocessing_pool)
results = thread_pool.imap(worker, get_symbols())
with open("C:\Users\miner\Desktop\unusual.txt", 'w') as f:
for result in results:
if result:
print(result, file=f)
# end = time.time()
# print(start - end)