如何独立 运行 websockets

How to run websockets independently

我尝试启动 Binance websocket 来收集蜡烛数据。如果数据处理功能没有延迟,它会很好地工作。但是当处理一个代码数据的函数发生一些暂停时,它也会延迟对其他代码的响应。有人知道如何独立 运行 他们吗?

from binance.client import Client
from binance.websockets import BinanceSocketManager

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)
bm = BinanceSocketManager(client, user_timeout=60)

def process(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)

def socket_1():
     conn_key = bm.start_kline_socket('ETHUSDT', process, '1h')

def socket_2():
     conn_key = bm.start_kline_socket('BNBUSDT', process, '1h')

socket_1()
socket_2()

bm.start()

我尝试按照@Mike Malyi 的建议使用 asyncio 使套接字 运行 成为两个单独的任务,但它并没有消除延迟:

import asyncio

def process(msg):
    asyncio.run(main(msg))

async def main(msg):
    if msg['s'] == 'ETHUSDT':
        task1 = asyncio.create_task(process_message(msg))
        await task1
    else:
        task2 = asyncio.create_task(process_message(msg))
        await task2

async def process_message(msg):
    print(msg['s'])
    if msg['s'] == 'ETHUSDT':
        await asyncio.sleep(5)

eth_key = bm.start_kline_socket('ETHUSDT', process, '1h')
bnb_key = bm.start_kline_socket('BNBUSDT', process, '1h')

bm.start()

我也尝试在 threads 中使用 Queue 独立地创建函数 运行,但它没有帮助,一个函数仍然延迟另一个:

from queue import Queue

def consumer(in_q):
    while True:
        msg = in_q.get()
        process_message(msg)
    
def producer(out_q):
    eth = bm.start_kline_socket('ETHUSDT', out_q.put, '1h')
    bnb = bm.start_kline_socket('BNBUSDT', out_q.put, '1h')

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
        time.sleep(5)
        print(f"{msg['s']} with delay, {time.strftime('%X')}")
    else:
        print(f"{msg['s']} {time.strftime('%X')}")


q = Queue()
t1 = Thread(target = consumer, args =(q, )) 
t2 = Thread(target = producer, args =(q, )) 
t1.start() 
t2.start() 

bm.start() 
from binance.client import Client
from binance.websockets import BinanceSocketManager
import _thread as thread
import time
import queue

api_key = ''
api_secret = ''
client = Client(api_key, api_secret)

def process_message(msg):
    if msg['s'] == 'ETHUSDT':
      print(f"{msg['s']} with delay, {time.strftime('%X')}")
      time.sleep(5)
      print('delay end')  
    else:
        print(f"{msg['s']} {time.strftime('%X')}")
  

def build_thread (symbol):
  print('start thread', symbol)
  q = queue.Queue()
  bm = BinanceSocketManager(client, user_timeout=60)
  conn_key = bm.start_kline_socket(symbol, q.put, '1h')
  bm.start()
  while(True):
    msg = q.get()
    process_message(msg)

thread.start_new_thread(build_thread, ('ETHUSDT', ))  
thread.start_new_thread(build_thread, ('BNBUSDT', ))  

这是为了从 SQL 获取货币对和停止价位(为您提供内联查询,因此代码仍然有效)然后在停止价位低于收盘价时停止套接字。每对都在自己的进程中运行,因此将扩展到 CPU 可用线程的数量。

import config
from binance import ThreadedWebsocketManager
from datetime import datetime
import pyodbc
from multiprocessing import Pool, cpu_count

KEY = config.binance_key
SECRET = config.binance_secret
BASE_URL = config.binance_base_url

''' ======  begin of functions ====== '''
def exec_sql (query) :
    cnxn_p = pyodbc.connect(config.sql_connection)
    cursor_p = cnxn_p.cursor()
    cursor_p.execute(query)
    cnxn_p.commit()
    cursor_p.close()
    cnxn_p.close()
    
def process_message(pair,stop):
    print(pair)
    print(stop)

    twm = ThreadedWebsocketManager(api_key=KEY, api_secret=SECRET)
    # start is required to initialise its internal loop
    twm.start()

    def handle_socket_message(msg):
        transactiontime = msg['k']['T'] / 1000
        transactiontime = datetime.fromtimestamp(transactiontime).strftime('%d %b %Y %H:%M:%S')

        if msg['e'] != 'error':
            # print("{} - {} - Interval {} - Open: {} - Close: {} - High: {} - Low: {} - Volume: {}".
            #      format(transactiontime,msg['s'],msg['k']['i'],msg['k']['o'],msg['k']['c'],msg['k']['h'],msg['k']['l'],msg['k']['v']))
            print("{} - {} - Interval {} - Close: {} - Stop: {}".
                 format(transactiontime,msg['s'],msg['k']['i'],msg['k']['c'], stop ))
        else:
            print(msg)

        Close = float(msg['k']['c'])
        if Close < stop:
            print(pair + ' close is below Stop')
            twm.stop()

    twm.start_kline_socket(callback=handle_socket_message, symbol=pair)
    twm.join()  

def main():
       
    print(f'starting computations on {cpu_count()} cores')

    # connect SQL server
    cnxn = pyodbc.connect(config.sql_connection)
    cursor = cnxn.cursor()
    sql = """select 'BNBBTC' as pair, 0.01086300 as stop
            union
            select 'BTCUSDT', 56234"""
    cursor.execute(sql)
    
    # iterate pairs
    rows = cursor.fetchall()
    pairs = []
    stops = []
    for row in rows:       
        pairs.append(row.pair)
        stops.append(row.stop)

    with Pool() as pool:
        pool.starmap(process_message, zip(pairs,stops))
    pool.close()

    print('pool done')    

    
if __name__ == '__main__':
    main()