如果 API 页面无效,如何关闭 websocket 连接

How do you close a websocket connection if the API page is invalid

我正在使用 Binance API。我正在连接到他们的 API 并尝试评估 Binance 是否在他们的平台上有资产列表。资产列表如下:

assets = ['tribe', 'pax']

我通过将资产名称插入 SOCKET link:

将其传递给他们的 API
SOCKET = f"wss://stream.binance.com:9443/ws/{asset}usdt@ticker"

如果调用 on_message,我知道资产确实存在于他们的网站上,因为那时我已经与他们的 API 建立了一致的连接,并且它将继续打印消息,除非我关闭连接(我这样做)。但是,如果在 n 时间内没有收到消息,我就知道他们没有我要找的资产。在这种情况下,Binance 确实有 tribe,但没有 pax。如果资产在 n 次后不在他们的网站上,我想关闭连接,我该怎么办?

import ssl
import websocket

def on_open(ws):
    print('connection: successful')


def on_close(ws, *args):
    print('connection: lost')
    print("---------------------------------------------------")
    ws.close()


def on_message(ws, message):
    print("message received")
    print()
    ws.close()


def on_error(ws, message):
    print(message)
    print()


assets = ['tribe', 'pax']
for asset in assets:
    print(asset)
    SOCKET = f"wss://stream.binance.com:9443/ws/{asset}usdt@ticker"


    ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message,
                                on_error=on_error)

    ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

我尝试写:

if on_message == False:
    ws.close()

但是这不起作用,因为据我所知 on_message 甚至没有被调用。

这是币安 API 文档: https://github.com/binance/binance-spot-api-docs/blob/master/web-socket-streams.md#klinecandlestick-streams 这是 websocket-client 文档: https://websocket-client.readthedocs.io/en/latest/examples.html

试试这个:-

import websocket
import ssl
import time
from threading import Thread


class Binance():
    def __init__(self, asset, timeout=5):
        self.url = f'wss://stream.binance.com:9443/ws/{asset}usdt@ticker'
        self.ws = None
        self.mr = False
        self.timeout = timeout

    def start(self):
        self.ws = websocket.WebSocketApp(self.url, on_message=self.on_message)
        Thread(target=self.monitor).start()
        self.ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

    def on_message(self, ws, message):
        self.mr = True
        print(message)

    def stop(self):
        if self.ws:
            self.ws.close()
            self.ws = None

    def monitor(self):
        while True:
            time.sleep(self.timeout)
            if not self.mr:
                self.stop()
                break
            self.mr = False


def runner(asset):
    Binance(asset).start()


for asset in ['pax', 'tribe']:
    Thread(target=runner, args=(asset,)).start()

这很粗糙,但它有效


import ssl
import websocket
import requests
import json
import pprint

def on_open(ws):
    print('connection: successful')


def on_close(ws, *args):
    print('connection: lost')
    print()
    ws.close()


def on_message(ws, message):
    json_message = json.loads(message)
    pprint.pprint(json_message)
    ws.close()


def on_error(ws, message):
    print("error")
    print()


assets = ['TRIBE', 'PAX', 'OGN', 'RAI', 'QNT', 'BTC']
for asset in assets:
    url = f"https://www.binance.com/en/trade/{asset}_USDT"
    soup = requests.get(url).text

    if "https://bin.bnbstatic.com/static/images/electron/404-error.png" in soup:
        print(f"{asset} not on coinbase")


    else:
        print(asset, "on coinbase")
        SOCKET = f"wss://stream.binance.com:9443/ws/{asset.lower()}usdt@ticker"
        print(SOCKET)

        ws = websocket.WebSocketApp(SOCKET, on_open=on_open, on_close=on_close, on_message=on_message, on_error=on_error)
        ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

    print("------------------------------")
    print()


此外,这并没有使用您所想的方法,但它确实也像您想要的那样工作(我认为)。

我在研究 ML 引擎的代码提要时遇到了这个线程。我最初认为该示例使用了当前的 websocket package, which is really old (2010?) and not maintained anymore, but it probably uses the websock-client 包。两者都被命名为 websock。我没有深入调查,但较新的软件包似乎很有成效。

另一种方法是使用新的 websockets 包。这是使用协程 (asyncio) 的单线程来自 coinbase 的代码提要。请注意,请求在所有 assets/products 上发送一次,然后循环接收代码。如果有任何失败,try-catch 将是 运行。

def run_feed():
    '''
    simple test for pro.coinbase ticker feed
    See https://websockets.readthedocs.io/en/stable/    
    '''
    feed_done = False

    def sig_handler(signo, frame):
        nonlocal feed_done
        feed_done = True

    # url="wss://ws-feed.pro.coinbase.com"
    url="wss://ws-feed.pro.coinbase1.com"    
    prodlist = ['ETH-USD', 'DOT-USD']
    json_req = json.dumps(
        {
            "type": "subscribe",
            "product_ids": prodlist,
            "channels": ["matches"],
        }
    )
    
    async def call_api(request):
        nonlocal feed_done
        try:
            async with websockets.connect(url) as ws:
                await ws.send(request)
                while not feed_done:
                    recv = await ws.recv()
                    print_recv(json.loads(recv))
                    print(f'exit feed loop and close websocket')
                    await ws.close()
        except Exception as e:
            print(f'exception: doc={e.__doc__} str={e}')

    signal.signal(signal.SIGINT, sig_handler)                
    asyncio.get_event_loop().run_until_complete(call_api(json_req))