如何将 运行 for 循环作为线程来加速 for 循环
how to run for loop as threads so it speeds up the for loop
我有一个 for 循环,它在其中获取每个键的历史数据(historical_data 函数)。现在我有大约200个加密货币,所以要一个一个地获取历史数据需要很长时间。现在我想知道是否可以在单独的线程中获取每个交易品种的历史数据。
所以在代码中它 运行 遍历每个符号和每个间隔。然后我给这些列命名并删除一些不可用的列。每次循环后,数据都会附加到列表中。如果他们必须一个接一个 运行,这会花费很长时间,这就是为什么我想在线程中尝试 运行 这个,但我不知道具体怎么做。
提前致谢!
class crypto:
symbols = []
with open('allsymbols', 'r+') as f:
for line in f:
symbol = line.strip('\n') + 'BTC'
symbols.append(symbol)
intervals = ['1m','5m']
@staticmethod
def historical_data():
historical_list = []
for i in range(len(crypto.symbols)):
for j in range(len(crypto.intervals)):
historical_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1]
historical_df = pd.DataFrame(historical_data)
historical_df.columns = ['time', '1', 'high', 'low', 'close', '5', '6', '7', '8', '9', '10', '11']
historical_df.drop(columns=['1', '5', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
historical_df['interval'] = crypto.intervals[j]
historical_df['symbol'] = crypto.symbols[i]
historical_df[['high', 'low', 'close']] = historical_df[['high', 'low', 'close']].apply(pd.to_numeric, axis=1)
historical_df['time'] = pd.to_datetime(historical_df['time'] / 1000, unit='s')
historical_list.append(historical_df.to_dict())
return historical_list
注意:我将列表转换为字典,然后再次将字典的值转换为列表。这可能会减慢进程很多?也许有一种更简单的方法可以做到这一点,但我不知道该怎么做。不过这不是重点。
@staticmethod
def refactor_list(historical_list):
historical_list_refactored = []
for i in range(len(historical_list)):
single_key_data = historical_list[i]
single_key_data['high'] = list(single_key_data['high'].values())
single_key_data['low'] = list(single_key_data['low'].values())
single_key_data['close'] = list(single_key_data['close'].values())
single_key_data['interval'] = list(single_key_data['interval'].values())
single_key_data['symbol'] = list(single_key_data['symbol'].values())
single_key_data['time'] = list(single_key_data['time'].values())
historical_list_refactored.append(single_key_data)
return historical_list_refactored
符号列表:
['1INCHBTC', 'AAVEBTC', 'ACMBTC', 'ADABTC', 'ADXBTC', 'AERGOBTC', 'AGIXBTC', 'AGLDBTC', 'AIONBTC', 'AKROBTC', 'ALGOBTC', 'ALICEBTC', 'ALPACABTC', 'ALPHABTC', 'AMBBTC', 'ANKRBTC', 'ANTBTC', 'APPCBTC', 'ARDRBTC', 'ARPABTC', 'ARBTC', 'ARKBTC', 'ASRBTC', 'ASTBTC', 'ATABTC', 'ATMBTC', 'ATOMBTC', 'AUCTIONBTC', 'AUDIOBTC', 'AUTOBTC', 'AVABTC', 'AVAXBTC', 'AXSBTC', 'BADGERBTC', 'BAKEBTC', 'BALBTC', 'BANDBTC', 'BARBTC', 'BATBTC', 'BCDBTC', 'BEAMBTC', 'BELBTC', 'BETABTC', 'BLZBTC', 'BNBBTC', 'BNTBTC', 'BNXBTC', 'BONDBTC', 'BRDBTC', 'BTCSTBTC', 'BTGBTC', 'BTSBTC', 'BZRXBTC', 'C98BTC', 'CAKEBTC', 'CELOBTC', 'CELRBTC', 'CFXBTC', 'CHESSBTC', 'CHRBTC', 'CHZBTC', 'CITYBTC', 'CKBBTC', 'CLVBTC', 'CNDBTC', 'COMPBTC', 'COSBTC', 'COTIBTC', 'CRVBTC', 'CTKBTC', 'CTSIBTC', 'CTXCBTC', 'CVCBTC', 'DARBTC', 'DASHBTC', 'DATABTC', 'DCRBTC', 'DEGOBTC', 'DGBBTC', 'DIABTC', 'DNTBTC', 'DOCKBTC', 'DODOBTC', 'DOGEBTC', 'DOTBTC', 'DREPBTC', 'DUSKBTC', 'DYDXBTC', 'EGLDBTC', 'ELFBTC', 'ENJBTC', 'ENSBTC', 'EOSBTC', 'EPSBTC', 'ETCBTC', 'ETHBTC', 'EVXBTC', 'EZBTC', 'FARMBTC', 'FETBTC', 'FIDABTC', 'FILBTC', 'FIOBTC', 'FIROBTC', 'FISBTC', 'FLMBTC', 'FLOWBTC', 'FORBTC', 'FORTHBTC', 'FRONTBTC', 'FTMBTC', 'FTTBTC', 'FUNBTC', 'FXSBTC', 'GALABTC', 'GASBTC', 'GLMBTC', 'GNOBTC', 'GOBTC', 'GRSBTC', 'GRTBTC', 'GTCBTC', 'GTOBTC', 'GXSBTC', 'HARDBTC', 'HBARBTC', 'HIVEBTC', 'HNTBTC', 'ICPBTC', 'ICXBTC', 'IDEXBTC', 'ILVBTC', 'INJBTC', 'IOSTBTC', 'IOTABTC', 'IOTXBTC', 'IRISBTC', 'JASMYBTC', 'JSTBTC', 'JUVBTC', 'KAVABTC', 'KEEPBTC', 'KLAYBTC', 'KMDBTC', 'KNCBTC', 'KSMBTC', 'LAZIOBTC', 'LINABTC', 'LINKBTC', 'LITBTC', 'LOOMBTC', 'LPTBTC', 'LRCBTC', 'LSKBTC', 'LTCBTC', 'LTOBTC', 'LUNABTC', 'MANABTC', 'MATICBTC', 'MBOXBTC', 'MDABTC', 'MDTBTC', 'MDXBTC', 'MINABTC', 'MIRBTC', 'MITHBTC', 'MKRBTC', 'MLNBTC', 'MOVRBTC', 'MTHBTC', 'MTLBTC', 'NANOBTC', 'NASBTC', 'NAVBTC', 'NEARBTC', 'NEBLBTC', 'NEOBTC', 'NKNBTC', 'NMRBTC', 'NUBTC', 'NULSBTC', 'NXSBTC', 'OAXBTC', 'OCEANBTC', 'OGBTC', 'OGNBTC', 'OMBTC', 'OMGBTC', 'ONEBTC', 'ONGBTC', 'ONTBTC', 'ORNBTC', 'OXTBTC', 'PAXGBTC', 'PERLBTC', 'PERPBTC', 'PHABTC', 'PHBBTC', 'PIVXBTC', 'PNTBTC', 'POLSBTC', 'POLYBTC', 'PONDBTC', 'PORTOBTC', 'POWRBTC', 'PROMBTC', 'PSGBTC', 'QIBTC', 'QKCBTC', 'QLCBTC', 'QNTBTC', 'QSPBTC', 'QTUMBTC', 'QUICKBTC', 'RADBTC', 'RAMPBTC', 'RAREBTC', 'RDNBTC', 'REEFBTC', 'RENBTC', 'RENBTCBTC', 'REPBTC', 'REQBTC', 'RGTBTC', 'RIFBTC', 'RLCBTC', 'ROSEBTC', 'RSRBTC', 'RUNEBTC', 'RVNBTC', 'SANDBTC', 'SCBTC', 'SCRTBTC', 'SFPBTC', 'SKLBTC', 'SNMBTC', 'SNTBTC', 'SNXBTC', 'SOLBTC', 'SRMBTC', 'SSVBTC', 'STEEMBTC', 'STMXBTC', 'STORJBTC', 'STPTBTC', 'STRAXBTC', 'STXBTC', 'SUPERBTC', 'SUSHIBTC', 'SXPBTC', 'SYSBTC', 'TCTBTC', 'TFUELBTC', 'THETABTC', 'TKOBTC', 'TLMBTC', 'TOMOBTC', 'TORNBTC', 'TRBBTC', 'TRIBEBTC', 'TRUBTC', 'TRXBTC', 'TVKBTC', 'TWTBTC', 'UMABTC', 'UNFIBTC', 'UNIBTC', 'UTKBTC', 'VETBTC', 'VGXBTC', 'VIBBTC', 'VIDTBTC', 'VITEBTC', 'WABIBTC', 'WANBTC', 'WAVESBTC', 'WAXPBTC', 'WBTCBTC', 'WINGBTC', 'WNXMBTC', 'WRXBTC', 'WTCBTC', 'XEMBTC', 'XLMBTC', 'XMRBTC', 'XRPBTC', 'XTZBTC', 'XVGBTC', 'XVSBTC', 'YFIBTC', 'YFIIBTC', 'YGGBTC', 'YOYOBTC', 'ZECBTC', 'ZENBTC', 'ZILBTC', 'ZRXBTC']
这是您可以使用 python-binance
向 binance API 服务器发送多个请求的方法
import asyncio
from binance import AsyncClient
RESULTS = [] # let's store all results here
class GetAllBinanceData:
def __init__(self, workers_num: int = 10):
self.workers_num: int = workers_num
self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)
async def get_symbols_from_somewhere(self):
"""Get symbols and distribute them among workers"""
# imagine the symbols are from some file
symbols = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
for i in symbols:
await self.task_q.put(i)
for i in range(self.workers_num):
await self.task_q.put(None)
async def get_historical_klines(self, client: AsyncClient):
"""Get data and print it"""
while True:
symbol = await self.task_q.get()
if symbol is None:
break
klines = await client.get_historical_klines(
symbol=symbol,
interval=AsyncClient.KLINE_INTERVAL_1MINUTE,
start_str="2021-11-23 10:00:00",
end_str="2021-11-23 10:01:00"
)
print(klines) # just print
RESULTS.append(klines) # send somewhere else
async def amain(self) -> None:
"""Main async wrapper fucntion"""
client = await AsyncClient.create()
await asyncio.gather(
self.get_symbols_from_somewhere(),
*[self.get_historical_klines(client) for _ in range(self.workers_num)]
)
await client.close_connection()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(GetAllBinanceData().amain())
print("*" * 100)
print(RESULTS)
另一种非常直接但效率较低的方法。并不是说 Binance 在您创建多个连接时生气并且可以开始无视您。
from binance import Client
from concurrent.futures import ThreadPoolExecutor
RESULTS = [] # let's store all results here
SYMBOLS = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
def get_historical_klines(symbol):
try:
client = Client()
klines = client.get_historical_klines(
symbol=symbol,
interval=Client.KLINE_INTERVAL_1MINUTE,
start_str="2021-11-23 10:00:00",
end_str="2021-11-23 10:01:00"
)
print(klines) # just print
RESULTS.append(klines) # send somewhere else
finally:
client.close_connection()
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=10) as pool:
pool.map(get_historical_klines, SYMBOLS)
print("*" * 100)
print(len(RESULTS))
我有一个 for 循环,它在其中获取每个键的历史数据(historical_data 函数)。现在我有大约200个加密货币,所以要一个一个地获取历史数据需要很长时间。现在我想知道是否可以在单独的线程中获取每个交易品种的历史数据。
所以在代码中它 运行 遍历每个符号和每个间隔。然后我给这些列命名并删除一些不可用的列。每次循环后,数据都会附加到列表中。如果他们必须一个接一个 运行,这会花费很长时间,这就是为什么我想在线程中尝试 运行 这个,但我不知道具体怎么做。
提前致谢!
class crypto:
symbols = []
with open('allsymbols', 'r+') as f:
for line in f:
symbol = line.strip('\n') + 'BTC'
symbols.append(symbol)
intervals = ['1m','5m']
@staticmethod
def historical_data():
historical_list = []
for i in range(len(crypto.symbols)):
for j in range(len(crypto.intervals)):
historical_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1]
historical_df = pd.DataFrame(historical_data)
historical_df.columns = ['time', '1', 'high', 'low', 'close', '5', '6', '7', '8', '9', '10', '11']
historical_df.drop(columns=['1', '5', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
historical_df['interval'] = crypto.intervals[j]
historical_df['symbol'] = crypto.symbols[i]
historical_df[['high', 'low', 'close']] = historical_df[['high', 'low', 'close']].apply(pd.to_numeric, axis=1)
historical_df['time'] = pd.to_datetime(historical_df['time'] / 1000, unit='s')
historical_list.append(historical_df.to_dict())
return historical_list
注意:我将列表转换为字典,然后再次将字典的值转换为列表。这可能会减慢进程很多?也许有一种更简单的方法可以做到这一点,但我不知道该怎么做。不过这不是重点。
@staticmethod
def refactor_list(historical_list):
historical_list_refactored = []
for i in range(len(historical_list)):
single_key_data = historical_list[i]
single_key_data['high'] = list(single_key_data['high'].values())
single_key_data['low'] = list(single_key_data['low'].values())
single_key_data['close'] = list(single_key_data['close'].values())
single_key_data['interval'] = list(single_key_data['interval'].values())
single_key_data['symbol'] = list(single_key_data['symbol'].values())
single_key_data['time'] = list(single_key_data['time'].values())
historical_list_refactored.append(single_key_data)
return historical_list_refactored
符号列表:
['1INCHBTC', 'AAVEBTC', 'ACMBTC', 'ADABTC', 'ADXBTC', 'AERGOBTC', 'AGIXBTC', 'AGLDBTC', 'AIONBTC', 'AKROBTC', 'ALGOBTC', 'ALICEBTC', 'ALPACABTC', 'ALPHABTC', 'AMBBTC', 'ANKRBTC', 'ANTBTC', 'APPCBTC', 'ARDRBTC', 'ARPABTC', 'ARBTC', 'ARKBTC', 'ASRBTC', 'ASTBTC', 'ATABTC', 'ATMBTC', 'ATOMBTC', 'AUCTIONBTC', 'AUDIOBTC', 'AUTOBTC', 'AVABTC', 'AVAXBTC', 'AXSBTC', 'BADGERBTC', 'BAKEBTC', 'BALBTC', 'BANDBTC', 'BARBTC', 'BATBTC', 'BCDBTC', 'BEAMBTC', 'BELBTC', 'BETABTC', 'BLZBTC', 'BNBBTC', 'BNTBTC', 'BNXBTC', 'BONDBTC', 'BRDBTC', 'BTCSTBTC', 'BTGBTC', 'BTSBTC', 'BZRXBTC', 'C98BTC', 'CAKEBTC', 'CELOBTC', 'CELRBTC', 'CFXBTC', 'CHESSBTC', 'CHRBTC', 'CHZBTC', 'CITYBTC', 'CKBBTC', 'CLVBTC', 'CNDBTC', 'COMPBTC', 'COSBTC', 'COTIBTC', 'CRVBTC', 'CTKBTC', 'CTSIBTC', 'CTXCBTC', 'CVCBTC', 'DARBTC', 'DASHBTC', 'DATABTC', 'DCRBTC', 'DEGOBTC', 'DGBBTC', 'DIABTC', 'DNTBTC', 'DOCKBTC', 'DODOBTC', 'DOGEBTC', 'DOTBTC', 'DREPBTC', 'DUSKBTC', 'DYDXBTC', 'EGLDBTC', 'ELFBTC', 'ENJBTC', 'ENSBTC', 'EOSBTC', 'EPSBTC', 'ETCBTC', 'ETHBTC', 'EVXBTC', 'EZBTC', 'FARMBTC', 'FETBTC', 'FIDABTC', 'FILBTC', 'FIOBTC', 'FIROBTC', 'FISBTC', 'FLMBTC', 'FLOWBTC', 'FORBTC', 'FORTHBTC', 'FRONTBTC', 'FTMBTC', 'FTTBTC', 'FUNBTC', 'FXSBTC', 'GALABTC', 'GASBTC', 'GLMBTC', 'GNOBTC', 'GOBTC', 'GRSBTC', 'GRTBTC', 'GTCBTC', 'GTOBTC', 'GXSBTC', 'HARDBTC', 'HBARBTC', 'HIVEBTC', 'HNTBTC', 'ICPBTC', 'ICXBTC', 'IDEXBTC', 'ILVBTC', 'INJBTC', 'IOSTBTC', 'IOTABTC', 'IOTXBTC', 'IRISBTC', 'JASMYBTC', 'JSTBTC', 'JUVBTC', 'KAVABTC', 'KEEPBTC', 'KLAYBTC', 'KMDBTC', 'KNCBTC', 'KSMBTC', 'LAZIOBTC', 'LINABTC', 'LINKBTC', 'LITBTC', 'LOOMBTC', 'LPTBTC', 'LRCBTC', 'LSKBTC', 'LTCBTC', 'LTOBTC', 'LUNABTC', 'MANABTC', 'MATICBTC', 'MBOXBTC', 'MDABTC', 'MDTBTC', 'MDXBTC', 'MINABTC', 'MIRBTC', 'MITHBTC', 'MKRBTC', 'MLNBTC', 'MOVRBTC', 'MTHBTC', 'MTLBTC', 'NANOBTC', 'NASBTC', 'NAVBTC', 'NEARBTC', 'NEBLBTC', 'NEOBTC', 'NKNBTC', 'NMRBTC', 'NUBTC', 'NULSBTC', 'NXSBTC', 'OAXBTC', 'OCEANBTC', 'OGBTC', 'OGNBTC', 'OMBTC', 'OMGBTC', 'ONEBTC', 'ONGBTC', 'ONTBTC', 'ORNBTC', 'OXTBTC', 'PAXGBTC', 'PERLBTC', 'PERPBTC', 'PHABTC', 'PHBBTC', 'PIVXBTC', 'PNTBTC', 'POLSBTC', 'POLYBTC', 'PONDBTC', 'PORTOBTC', 'POWRBTC', 'PROMBTC', 'PSGBTC', 'QIBTC', 'QKCBTC', 'QLCBTC', 'QNTBTC', 'QSPBTC', 'QTUMBTC', 'QUICKBTC', 'RADBTC', 'RAMPBTC', 'RAREBTC', 'RDNBTC', 'REEFBTC', 'RENBTC', 'RENBTCBTC', 'REPBTC', 'REQBTC', 'RGTBTC', 'RIFBTC', 'RLCBTC', 'ROSEBTC', 'RSRBTC', 'RUNEBTC', 'RVNBTC', 'SANDBTC', 'SCBTC', 'SCRTBTC', 'SFPBTC', 'SKLBTC', 'SNMBTC', 'SNTBTC', 'SNXBTC', 'SOLBTC', 'SRMBTC', 'SSVBTC', 'STEEMBTC', 'STMXBTC', 'STORJBTC', 'STPTBTC', 'STRAXBTC', 'STXBTC', 'SUPERBTC', 'SUSHIBTC', 'SXPBTC', 'SYSBTC', 'TCTBTC', 'TFUELBTC', 'THETABTC', 'TKOBTC', 'TLMBTC', 'TOMOBTC', 'TORNBTC', 'TRBBTC', 'TRIBEBTC', 'TRUBTC', 'TRXBTC', 'TVKBTC', 'TWTBTC', 'UMABTC', 'UNFIBTC', 'UNIBTC', 'UTKBTC', 'VETBTC', 'VGXBTC', 'VIBBTC', 'VIDTBTC', 'VITEBTC', 'WABIBTC', 'WANBTC', 'WAVESBTC', 'WAXPBTC', 'WBTCBTC', 'WINGBTC', 'WNXMBTC', 'WRXBTC', 'WTCBTC', 'XEMBTC', 'XLMBTC', 'XMRBTC', 'XRPBTC', 'XTZBTC', 'XVGBTC', 'XVSBTC', 'YFIBTC', 'YFIIBTC', 'YGGBTC', 'YOYOBTC', 'ZECBTC', 'ZENBTC', 'ZILBTC', 'ZRXBTC']
这是您可以使用 python-binance
import asyncio
from binance import AsyncClient
RESULTS = [] # let's store all results here
class GetAllBinanceData:
def __init__(self, workers_num: int = 10):
self.workers_num: int = workers_num
self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)
async def get_symbols_from_somewhere(self):
"""Get symbols and distribute them among workers"""
# imagine the symbols are from some file
symbols = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
for i in symbols:
await self.task_q.put(i)
for i in range(self.workers_num):
await self.task_q.put(None)
async def get_historical_klines(self, client: AsyncClient):
"""Get data and print it"""
while True:
symbol = await self.task_q.get()
if symbol is None:
break
klines = await client.get_historical_klines(
symbol=symbol,
interval=AsyncClient.KLINE_INTERVAL_1MINUTE,
start_str="2021-11-23 10:00:00",
end_str="2021-11-23 10:01:00"
)
print(klines) # just print
RESULTS.append(klines) # send somewhere else
async def amain(self) -> None:
"""Main async wrapper fucntion"""
client = await AsyncClient.create()
await asyncio.gather(
self.get_symbols_from_somewhere(),
*[self.get_historical_klines(client) for _ in range(self.workers_num)]
)
await client.close_connection()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(GetAllBinanceData().amain())
print("*" * 100)
print(RESULTS)
另一种非常直接但效率较低的方法。并不是说 Binance 在您创建多个连接时生气并且可以开始无视您。
from binance import Client
from concurrent.futures import ThreadPoolExecutor
RESULTS = [] # let's store all results here
SYMBOLS = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
def get_historical_klines(symbol):
try:
client = Client()
klines = client.get_historical_klines(
symbol=symbol,
interval=Client.KLINE_INTERVAL_1MINUTE,
start_str="2021-11-23 10:00:00",
end_str="2021-11-23 10:01:00"
)
print(klines) # just print
RESULTS.append(klines) # send somewhere else
finally:
client.close_connection()
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=10) as pool:
pool.map(get_historical_klines, SYMBOLS)
print("*" * 100)
print(len(RESULTS))