Python: 如何对函数实现并发期货

Python: How to implement concurrent futures to a function

我想知道什么是实施并发期货以遍历 新程序.

的大量股票列表的好方法

在我之前的程序中,我尝试使用并发期货,但是在打印数据时它并不一致。例如,当 运行 一个很大的股票列表时,它每次都会给出不同的信息(正如您在上一个程序的输出 1 和 2 中看到的那样)。我想提供我以前的程序,看看我在实现并发期货方面做错了什么。

谢谢!

新计划

tickers = ["A","AA","AAC","AACG","AACIU","AADI","AAIC","AAIN","AAL","AAMC","AAME","AAN","AAOI","AAON","AAP","AAPL"]
def create_df(tickers):
    all_info = []
    for ticker in tickers:
        all_info.append(yf.Ticker(ticker).info)
        
    df = pd.DataFrame.from_records(all_info)
    df = df[['symbol','ebitda', 'enterpriseValue', 'trailingPE', 'sector']]
    df.dropna(inplace=True)
    # This is where you can add calculations and other columns not in Yfinance Library
    df['EV/Ratio'] = df['enterpriseValue'] / df['ebitda']
    return df

df = create_df(tickers)
print(df)
print('It took', time.time()-start, 'seconds.')

输出

   symbol        ebitda  enterpriseValue  trailingPE              sector   EV/Ratio
0       A  1.762000e+09     5.311271e+10   60.754720          Healthcare  30.143422
9    AAMC -2.015600e+07     1.971329e+08    1.013164  Financial Services  -9.780359
10   AAME  2.305600e+07     1.175756e+08    7.652329  Financial Services   5.099566
11    AAN  8.132960e+08     1.228469e+09    9.329710   Consumer Cyclical   1.510483
13   AAON  1.178790e+08     3.501286e+09   55.615944         Industrials  29.702376
14    AAP  1.239876e+09     1.609877e+10   25.986680   Consumer Cyclical  12.984181
15   AAPL  1.109350e+11     2.489513e+12   33.715443          Technology  22.441190
It took 101.81006002426147 seconds.

上一节目供参考

tickers = ["A","AA","AAC","AACG","AACIU","AADI","AAIC","AAIN","AAL","AAMC","AAME","AAN","AAOI","AAON","AAP","AAPL"]
start = time.time()

col_a = []  
col_b = []  
col_c = []  
col_d = []  

print('Lodaing Data... Please wait for results')


def do_something(tickers):
    print('---', tickers, '---')
    all_info = yf.Ticker(tickers).info
    try:
        a = all_info.get('ebitda')
        b = all_info.get('enterpriseValue')
        c = all_info.get('trailingPE')
        d = all_info.get('sector')
    except:
        None
    col_a.append(a)  
    col_b.append(b)  
    col_c.append(c)  
    col_d.append(d)     
    return
with concurrent.futures.ThreadPoolExecutor() as executer:
    executer.map(do_something, tickers)
        

# Dataframe Set Up
pd.set_option("display.max_rows", None)
   
df = pd.DataFrame({
    'Ticker': tickers,
    'Ebitda': col_a,  
    'EnterpriseValue' :col_b,  
    'PE Ratio': col_c,  
    'Sector': col_d,
})
print(df.dropna())
print(len('Total Companies with Information'))
print('It took', time.time()-start, 'seconds.')

上一个程序的输出 1

   Ticker        Ebitda  EnterpriseValue   PE Ratio              Sector
1      AA  1.651000e+09     5.031802e+10  49.183292          Healthcare
3    AACG  2.216000e+09     1.168140e+10  11.711775     Basic Materials
5    AADI  1.928800e+07     1.108360e+08   6.954397  Financial Services
7    AAIN  1.128370e+08     3.960835e+09  57.706764         Industrials
8     AAL  8.303301e+08     1.103969e+09   9.111819   Consumer Cyclical
10   AAME  1.202330e+11     2.534678e+12  26.737967          Technology
12   AAOI -1.848400e+07     1.277540e+08   0.355233  Financial Services
14    AAP  1.224954e+09     1.770882e+10  26.059464   Consumer Cyclical
32
It took 4.2548089027404785 seconds.

上一个程序的输出 2

   Ticker        Ebitda  EnterpriseValue   PE Ratio              Sector
0       A -1.848400e+07     1.277540e+08   0.355233  Financial Services
4   AACIU  1.202330e+11     2.534678e+12  26.737967          Technology
5    AADI  1.651000e+09     5.031802e+10  49.183292          Healthcare
7    AAIN  1.128370e+08     3.960835e+09  57.706764         Industrials
9    AAMC  8.303301e+08     1.103969e+09   9.111819   Consumer Cyclical
10   AAME  2.216000e+09     1.168140e+10  11.711775     Basic Materials
13   AAON  1.224954e+09     1.770882e+10  26.059464   Consumer Cyclical
14    AAP  1.928800e+07     1.108360e+08   6.954397  Financial Services
32
It took 4.003742933273315 seconds.

这里是如何实现多线程到新函数的答案 由@iudeen

提供
import pandas as pd
import yfinance as yf
from concurrent.futures import ThreadPoolExecutor
import time
from stocks import tickers
start = time.time()



print('Lodaing Data... Please wait for results')
all_info = []
def create_df(ticker):
    all_info.append(yf.Ticker(ticker).info)
    
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(create_df, x) for x in tickers]

df = pd.DataFrame.from_records(all_info)
df = df[['symbol','ebitda', 'enterpriseValue', 'trailingPE', 'sector']]
df.dropna(inplace=True)
df['EV/Ratio'] = df['enterpriseValue'] / df['ebitda']
print(df)
print('It took', time.time()-start, 'seconds.')

您有一个多线程程序。函数 ThreadPoolExecutor.map 启动多个将 运行 并发的线程。每个线程包含一次对 do_something() 的调用,但您无法控制这些线程执行或完成的顺序。出现此问题是因为您将结果 (a、b、c、d) 附加到 do_something 内的各个列表 col_a、col_b 等。这些列表是全局的,因此数据以或多或少的随机顺序附加到它们。甚至有可能在四次调用 append() 的中间发生线程切换。所以数据的顺序是随机的,个别行可能会乱七八糟。

股票代码列表已添加到主线程中的数据框中。所以符号列表和数据本身是不同步的。这正是您所观察到的。

最简单的解决方案是在主线程中设置所有数据结构。这很容易做到,因为函数 map() return 是一个迭代器,迭代的顺序保证被保留。迭代器遍历由 do_something() 编辑的值 return。因此,与其尝试在该函数中更新列表 col_a、col_b 等,不如 return 值 a、b、c、d 作为元组。返回主线程,获取这些值并将它们附加到列中。

不同线程的执行顺序是不受控制的,但是map()会帮你排序;它首先收集所有结果,然后按顺序遍历它们。

更改程序的这一部分 - 其他一切都可以保持不变。

def do_something(tickers):
    print('---', tickers, '---')
    all_info = yf.Ticker(tickers).info
    try:
        a = all_info.get('ebitda')
        b = all_info.get('enterpriseValue')
        c = all_info.get('trailingPE')
        d = all_info.get('sector')
    except:
        return None, None, None, None  # must return a 4-tuple
    return a, b, c, d

with concurrent.futures.ThreadPoolExecutor() as executer:
    for a, b, c, d in executer.map(do_something, tickers):
        col_a.append(a)  
        col_b.append(b)  
        col_c.append(c)  
        col_d.append(d)