如何将 运行 for 循环作为线程来加速 for 循环

how to run for loop as threads so it speeds up the for loop

我有一个 for 循环,它在其中获取每个键的历史数据(historical_data 函数)。现在我有大约200个加密货币,所以要一个一个地获取历史数据需要很长时间。现在我想知道是否可以在单独的线程中获取每个交易品种的历史数据。

所以在代码中它 运行 遍历每个符号和每个间隔。然后我给这些列命名并删除一些不可用的列。每次循环后,数据都会附加到列表中。如果他们必须一个接一个 运行,这会花费很长时间,这就是为什么我想在线程中尝试 运行 这个,但我不知道具体怎么做。

提前致谢!

class crypto:
    symbols = []
    with open('allsymbols', 'r+') as f:
        for line in f:
            symbol = line.strip('\n') + 'BTC'
            symbols.append(symbol)

    intervals = ['1m','5m']

    @staticmethod
    def historical_data():
        historical_list = []
        for i in range(len(crypto.symbols)):
            for j in range(len(crypto.intervals)):
                historical_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1]
                historical_df = pd.DataFrame(historical_data)
                historical_df.columns = ['time', '1', 'high', 'low', 'close', '5', '6', '7', '8', '9', '10', '11']
                historical_df.drop(columns=['1', '5', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
                historical_df['interval'] = crypto.intervals[j]
                historical_df['symbol'] = crypto.symbols[i]
                historical_df[['high', 'low', 'close']] = historical_df[['high', 'low', 'close']].apply(pd.to_numeric, axis=1)
                historical_df['time'] = pd.to_datetime(historical_df['time'] / 1000, unit='s')
                historical_list.append(historical_df.to_dict())
        return historical_list

注意:我将列表转换为字典,然后再次将字典的值转换为列表。这可能会减慢进程很多?也许有一种更简单的方法可以做到这一点,但我不知道该怎么做。不过这不是重点。

    @staticmethod
    def refactor_list(historical_list):
        historical_list_refactored = []
        for i in range(len(historical_list)):
            single_key_data = historical_list[i]
            single_key_data['high'] = list(single_key_data['high'].values())
            single_key_data['low'] = list(single_key_data['low'].values())
            single_key_data['close'] = list(single_key_data['close'].values())
            single_key_data['interval'] = list(single_key_data['interval'].values())
            single_key_data['symbol'] = list(single_key_data['symbol'].values())
            single_key_data['time'] = list(single_key_data['time'].values())
            historical_list_refactored.append(single_key_data)
        return historical_list_refactored

符号列表:

['1INCHBTC', 'AAVEBTC', 'ACMBTC', 'ADABTC', 'ADXBTC', 'AERGOBTC', 'AGIXBTC', 'AGLDBTC', 'AIONBTC', 'AKROBTC', 'ALGOBTC', 'ALICEBTC', 'ALPACABTC', 'ALPHABTC', 'AMBBTC', 'ANKRBTC', 'ANTBTC', 'APPCBTC', 'ARDRBTC', 'ARPABTC', 'ARBTC', 'ARKBTC', 'ASRBTC', 'ASTBTC', 'ATABTC', 'ATMBTC', 'ATOMBTC', 'AUCTIONBTC', 'AUDIOBTC', 'AUTOBTC', 'AVABTC', 'AVAXBTC', 'AXSBTC', 'BADGERBTC', 'BAKEBTC', 'BALBTC', 'BANDBTC', 'BARBTC', 'BATBTC', 'BCDBTC', 'BEAMBTC', 'BELBTC', 'BETABTC', 'BLZBTC', 'BNBBTC', 'BNTBTC', 'BNXBTC', 'BONDBTC', 'BRDBTC', 'BTCSTBTC', 'BTGBTC', 'BTSBTC', 'BZRXBTC', 'C98BTC', 'CAKEBTC', 'CELOBTC', 'CELRBTC', 'CFXBTC', 'CHESSBTC', 'CHRBTC', 'CHZBTC', 'CITYBTC', 'CKBBTC', 'CLVBTC', 'CNDBTC', 'COMPBTC', 'COSBTC', 'COTIBTC', 'CRVBTC', 'CTKBTC', 'CTSIBTC', 'CTXCBTC', 'CVCBTC', 'DARBTC', 'DASHBTC', 'DATABTC', 'DCRBTC', 'DEGOBTC', 'DGBBTC', 'DIABTC', 'DNTBTC', 'DOCKBTC', 'DODOBTC', 'DOGEBTC', 'DOTBTC', 'DREPBTC', 'DUSKBTC', 'DYDXBTC', 'EGLDBTC', 'ELFBTC', 'ENJBTC', 'ENSBTC', 'EOSBTC', 'EPSBTC', 'ETCBTC', 'ETHBTC', 'EVXBTC', 'EZBTC', 'FARMBTC', 'FETBTC', 'FIDABTC', 'FILBTC', 'FIOBTC', 'FIROBTC', 'FISBTC', 'FLMBTC', 'FLOWBTC', 'FORBTC', 'FORTHBTC', 'FRONTBTC', 'FTMBTC', 'FTTBTC', 'FUNBTC', 'FXSBTC', 'GALABTC', 'GASBTC', 'GLMBTC', 'GNOBTC', 'GOBTC', 'GRSBTC', 'GRTBTC', 'GTCBTC', 'GTOBTC', 'GXSBTC', 'HARDBTC', 'HBARBTC', 'HIVEBTC', 'HNTBTC', 'ICPBTC', 'ICXBTC', 'IDEXBTC', 'ILVBTC', 'INJBTC', 'IOSTBTC', 'IOTABTC', 'IOTXBTC', 'IRISBTC', 'JASMYBTC', 'JSTBTC', 'JUVBTC', 'KAVABTC', 'KEEPBTC', 'KLAYBTC', 'KMDBTC', 'KNCBTC', 'KSMBTC', 'LAZIOBTC', 'LINABTC', 'LINKBTC', 'LITBTC', 'LOOMBTC', 'LPTBTC', 'LRCBTC', 'LSKBTC', 'LTCBTC', 'LTOBTC', 'LUNABTC', 'MANABTC', 'MATICBTC', 'MBOXBTC', 'MDABTC', 'MDTBTC', 'MDXBTC', 'MINABTC', 'MIRBTC', 'MITHBTC', 'MKRBTC', 'MLNBTC', 'MOVRBTC', 'MTHBTC', 'MTLBTC', 'NANOBTC', 'NASBTC', 'NAVBTC', 'NEARBTC', 'NEBLBTC', 'NEOBTC', 'NKNBTC', 'NMRBTC', 'NUBTC', 'NULSBTC', 'NXSBTC', 'OAXBTC', 'OCEANBTC', 'OGBTC', 'OGNBTC', 'OMBTC', 'OMGBTC', 'ONEBTC', 'ONGBTC', 'ONTBTC', 'ORNBTC', 'OXTBTC', 'PAXGBTC', 'PERLBTC', 'PERPBTC', 'PHABTC', 'PHBBTC', 'PIVXBTC', 'PNTBTC', 'POLSBTC', 'POLYBTC', 'PONDBTC', 'PORTOBTC', 'POWRBTC', 'PROMBTC', 'PSGBTC', 'QIBTC', 'QKCBTC', 'QLCBTC', 'QNTBTC', 'QSPBTC', 'QTUMBTC', 'QUICKBTC', 'RADBTC', 'RAMPBTC', 'RAREBTC', 'RDNBTC', 'REEFBTC', 'RENBTC', 'RENBTCBTC', 'REPBTC', 'REQBTC', 'RGTBTC', 'RIFBTC', 'RLCBTC', 'ROSEBTC', 'RSRBTC', 'RUNEBTC', 'RVNBTC', 'SANDBTC', 'SCBTC', 'SCRTBTC', 'SFPBTC', 'SKLBTC', 'SNMBTC', 'SNTBTC', 'SNXBTC', 'SOLBTC', 'SRMBTC', 'SSVBTC', 'STEEMBTC', 'STMXBTC', 'STORJBTC', 'STPTBTC', 'STRAXBTC', 'STXBTC', 'SUPERBTC', 'SUSHIBTC', 'SXPBTC', 'SYSBTC', 'TCTBTC', 'TFUELBTC', 'THETABTC', 'TKOBTC', 'TLMBTC', 'TOMOBTC', 'TORNBTC', 'TRBBTC', 'TRIBEBTC', 'TRUBTC', 'TRXBTC', 'TVKBTC', 'TWTBTC', 'UMABTC', 'UNFIBTC', 'UNIBTC', 'UTKBTC', 'VETBTC', 'VGXBTC', 'VIBBTC', 'VIDTBTC', 'VITEBTC', 'WABIBTC', 'WANBTC', 'WAVESBTC', 'WAXPBTC', 'WBTCBTC', 'WINGBTC', 'WNXMBTC', 'WRXBTC', 'WTCBTC', 'XEMBTC', 'XLMBTC', 'XMRBTC', 'XRPBTC', 'XTZBTC', 'XVGBTC', 'XVSBTC', 'YFIBTC', 'YFIIBTC', 'YGGBTC', 'YOYOBTC', 'ZECBTC', 'ZENBTC', 'ZILBTC', 'ZRXBTC']

这是您可以使用 python-binance

向 binance API 服务器发送多个请求的方法
import asyncio
from binance import AsyncClient

RESULTS = []  # let's store all results here


class GetAllBinanceData:
    def __init__(self, workers_num: int = 10):
        self.workers_num: int = workers_num
        self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)

    async def get_symbols_from_somewhere(self):
        """Get symbols and distribute them among workers"""
        # imagine the symbols are from some file
        symbols = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
        for i in symbols:
            await self.task_q.put(i)

        for i in range(self.workers_num):
            await self.task_q.put(None)

    async def get_historical_klines(self, client: AsyncClient):
        """Get data and print it"""
        while True:
            symbol = await self.task_q.get()
            if symbol is None:
                break
            klines = await client.get_historical_klines(
                symbol=symbol,
                interval=AsyncClient.KLINE_INTERVAL_1MINUTE,
                start_str="2021-11-23 10:00:00",
                end_str="2021-11-23 10:01:00"
            )
            print(klines)  # just print
            RESULTS.append(klines)  # send somewhere else

    async def amain(self) -> None:
        """Main async wrapper fucntion"""
        client = await AsyncClient.create()
        await asyncio.gather(
            self.get_symbols_from_somewhere(),
            *[self.get_historical_klines(client) for _ in range(self.workers_num)]
        )

        await client.close_connection()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(GetAllBinanceData().amain())
    print("*" * 100)
    print(RESULTS)

另一种非常直接但效率较低的方法。并不是说 Binance 在您创建多个连接时生气并且可以开始无视您。

from binance import Client
from concurrent.futures import ThreadPoolExecutor

RESULTS = []  # let's store all results here
SYMBOLS = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100


def get_historical_klines(symbol):
    try:
        client = Client()

        klines = client.get_historical_klines(
            symbol=symbol,
            interval=Client.KLINE_INTERVAL_1MINUTE,
            start_str="2021-11-23 10:00:00",
            end_str="2021-11-23 10:01:00"
        )
        print(klines)  # just print
        RESULTS.append(klines)  # send somewhere else
    finally:
        client.close_connection()


if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=10) as pool:
        pool.map(get_historical_klines, SYMBOLS)

    print("*" * 100)
    print(len(RESULTS))