如何独立 运行 websockets
How to run websockets independently
我尝试启动 Binance websocket 来收集蜡烛数据。如果数据处理功能没有延迟,它会很好地工作。但是当处理一个代码数据的函数发生一些暂停时,它也会延迟对其他代码的响应。有人知道如何独立 运行 他们吗?
from binance.client import Client
from binance.websockets import BinanceSocketManager
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)
def process(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
time.sleep(5)
def socket_1():
conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')
def socket_2():
conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')
socket_1()
socket_2()
bm.start()
我尝试按照@Mike Malyi 的建议使用 asyncio
使套接字 运行 成为两个单独的任务,但它并没有消除延迟:
import asyncio
def process(msg):
asyncio.run(main(msg))
async def main(msg):
if msg['s'] == 'ETHUSDT':
task1 = asyncio.create_task(process_message(msg))
await task1
else:
task2 = asyncio.create_task(process_message(msg))
await task2
async def process_message(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
await asyncio.sleep(5)
eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')
bm.start()
我也尝试在 threads
中使用 Queue
独立地创建函数 运行,但它没有帮助,一个函数仍然延迟另一个:
from queue import Queue
def consumer(in_q):
while True:
msg = in_q.get()
process_message(msg)
def producer(out_q):
eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')
def process_message(msg):
if msg['s'] == 'ETHUSDT':
time.sleep(5)
print(f"{msg['s']} with delay, {time.strftime('%X')}")
else:
print(f"{msg['s']} {time.strftime('%X')}")
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()
bm.start()
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
def process_message(msg):
if msg['s'] == 'ETHUSDT':
print(f"{msg['s']} with delay, {time.strftime('%X')}")
time.sleep(5)
print('delay end')
else:
print(f"{msg['s']} {time.strftime('%X')}")
def build_thread (symbol):
print('start thread', symbol)
q = queue.Queue()
bm = BinanceSocketManager(client, user_timeout=60)
conn_key = bm.start_kline_socket(symbol, q.put, '1h')
bm.start()
while(True):
msg = q.get()
process_message(msg)
thread.start_new_thread(build_thread, ('ETHUSDT', ))
thread.start_new_thread(build_thread, ('BNBUSDT', ))
这是为了从 SQL 获取货币对和停止价位(为您提供内联查询,因此代码仍然有效)然后在停止价位低于收盘价时停止套接字。每对都在自己的进程中运行,因此将扩展到 CPU 可用线程的数量。
import config
from binance import ThreadedWebsocketManager
from datetime import datetime
import pyodbc
from multiprocessing import Pool, cpu_count
KEY = config.binance_key
SECRET = config.binance_secret
BASE_URL = config.binance_base_url
''' ====== begin of functions ====== '''
def exec_sql (query) :
cnxn_p = pyodbc.connect(config.sql_connection)
cursor_p = cnxn_p.cursor()
cursor_p.execute(query)
cnxn_p.commit()
cursor_p.close()
cnxn_p.close()
def process_message(pair,stop):
print(pair)
print(stop)
twm = ThreadedWebsocketManager(api_key=KEY, api_secret=SECRET)
# start is required to initialise its internal loop
twm.start()
def handle_socket_message(msg):
transactiontime = msg['k']['T'] / 1000
transactiontime = datetime.fromtimestamp(transactiontime).strftime('%d %b %Y %H:%M:%S')
if msg['e'] != 'error':
# print("{} - {} - Interval {} - Open: {} - Close: {} - High: {} - Low: {} - Volume: {}".
# format(transactiontime,msg['s'],msg['k']['i'],msg['k']['o'],msg['k']['c'],msg['k']['h'],msg['k']['l'],msg['k']['v']))
print("{} - {} - Interval {} - Close: {} - Stop: {}".
format(transactiontime,msg['s'],msg['k']['i'],msg['k']['c'], stop ))
else:
print(msg)
Close = float(msg['k']['c'])
if Close < stop:
print(pair + ' close is below Stop')
twm.stop()
twm.start_kline_socket(callback=handle_socket_message, symbol=pair)
twm.join()
def main():
print(f'starting computations on {cpu_count()} cores')
# connect SQL server
cnxn = pyodbc.connect(config.sql_connection)
cursor = cnxn.cursor()
sql = """select 'BNBBTC' as pair, 0.01086300 as stop
union
select 'BTCUSDT', 56234"""
cursor.execute(sql)
# iterate pairs
rows = cursor.fetchall()
pairs = []
stops = []
for row in rows:
pairs.append(row.pair)
stops.append(row.stop)
with Pool() as pool:
pool.starmap(process_message, zip(pairs,stops))
pool.close()
print('pool done')
if __name__ == '__main__':
main()
我尝试启动 Binance websocket 来收集蜡烛数据。如果数据处理功能没有延迟,它会很好地工作。但是当处理一个代码数据的函数发生一些暂停时,它也会延迟对其他代码的响应。有人知道如何独立 运行 他们吗?
from binance.client import Client
from binance.websockets import BinanceSocketManager
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)
def process(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
time.sleep(5)
def socket_1():
conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')
def socket_2():
conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')
socket_1()
socket_2()
bm.start()
我尝试按照@Mike Malyi 的建议使用 asyncio
使套接字 运行 成为两个单独的任务,但它并没有消除延迟:
import asyncio
def process(msg):
asyncio.run(main(msg))
async def main(msg):
if msg['s'] == 'ETHUSDT':
task1 = asyncio.create_task(process_message(msg))
await task1
else:
task2 = asyncio.create_task(process_message(msg))
await task2
async def process_message(msg):
print(msg['s'])
if msg['s'] == 'ETHUSDT':
await asyncio.sleep(5)
eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')
bm.start()
我也尝试在 threads
中使用 Queue
独立地创建函数 运行,但它没有帮助,一个函数仍然延迟另一个:
from queue import Queue
def consumer(in_q):
while True:
msg = in_q.get()
process_message(msg)
def producer(out_q):
eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')
def process_message(msg):
if msg['s'] == 'ETHUSDT':
time.sleep(5)
print(f"{msg['s']} with delay, {time.strftime('%X')}")
else:
print(f"{msg['s']} {time.strftime('%X')}")
q = Queue()
t1 = Thread(target = consumer, args =(q, ))
t2 = Thread(target = producer, args =(q, ))
t1.start()
t2.start()
bm.start()
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue
api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
def process_message(msg):
if msg['s'] == 'ETHUSDT':
print(f"{msg['s']} with delay, {time.strftime('%X')}")
time.sleep(5)
print('delay end')
else:
print(f"{msg['s']} {time.strftime('%X')}")
def build_thread (symbol):
print('start thread', symbol)
q = queue.Queue()
bm = BinanceSocketManager(client, user_timeout=60)
conn_key = bm.start_kline_socket(symbol, q.put, '1h')
bm.start()
while(True):
msg = q.get()
process_message(msg)
thread.start_new_thread(build_thread, ('ETHUSDT', ))
thread.start_new_thread(build_thread, ('BNBUSDT', ))
这是为了从 SQL 获取货币对和停止价位(为您提供内联查询,因此代码仍然有效)然后在停止价位低于收盘价时停止套接字。每对都在自己的进程中运行,因此将扩展到 CPU 可用线程的数量。
import config
from binance import ThreadedWebsocketManager
from datetime import datetime
import pyodbc
from multiprocessing import Pool, cpu_count
KEY = config.binance_key
SECRET = config.binance_secret
BASE_URL = config.binance_base_url
''' ====== begin of functions ====== '''
def exec_sql (query) :
cnxn_p = pyodbc.connect(config.sql_connection)
cursor_p = cnxn_p.cursor()
cursor_p.execute(query)
cnxn_p.commit()
cursor_p.close()
cnxn_p.close()
def process_message(pair,stop):
print(pair)
print(stop)
twm = ThreadedWebsocketManager(api_key=KEY, api_secret=SECRET)
# start is required to initialise its internal loop
twm.start()
def handle_socket_message(msg):
transactiontime = msg['k']['T'] / 1000
transactiontime = datetime.fromtimestamp(transactiontime).strftime('%d %b %Y %H:%M:%S')
if msg['e'] != 'error':
# print("{} - {} - Interval {} - Open: {} - Close: {} - High: {} - Low: {} - Volume: {}".
# format(transactiontime,msg['s'],msg['k']['i'],msg['k']['o'],msg['k']['c'],msg['k']['h'],msg['k']['l'],msg['k']['v']))
print("{} - {} - Interval {} - Close: {} - Stop: {}".
format(transactiontime,msg['s'],msg['k']['i'],msg['k']['c'], stop ))
else:
print(msg)
Close = float(msg['k']['c'])
if Close < stop:
print(pair + ' close is below Stop')
twm.stop()
twm.start_kline_socket(callback=handle_socket_message, symbol=pair)
twm.join()
def main():
print(f'starting computations on {cpu_count()} cores')
# connect SQL server
cnxn = pyodbc.connect(config.sql_connection)
cursor = cnxn.cursor()
sql = """select 'BNBBTC' as pair, 0.01086300 as stop
union
select 'BTCUSDT', 56234"""
cursor.execute(sql)
# iterate pairs
rows = cursor.fetchall()
pairs = []
stops = []
for row in rows:
pairs.append(row.pair)
stops.append(row.stop)
with Pool() as pool:
pool.starmap(process_message, zip(pairs,stops))
pool.close()
print('pool done')
if __name__ == '__main__':
main()