每隔一分钟使用 websocket 数据触发功能

Trigger function every new minute w/ websocket data

使用 Binance futures aggregate trades stream (websocket),我试图计算每方(市场 maker/taker)每分钟所有交易的总价值。我正在努力解决的部分是试图找到一种有效的方法来识别一分钟结束和新一分钟开始的时间。到目前为止,我的解决方案是将 unix 值转换为日期时间值,仅保留日期时间值的 'minute' 部分,将其存储为变量(名为 'minute'),并检查每条新消息最新的 'minute' 值与之前的 'minute' 值相比如何。

当我 运行 脚本时,每当新的一分钟开始时,'minute' 变量就会更新,但是最后 elif 语句下的前面步骤的 none 似乎工作。

elif unixmin != minute:
    sumprodmkr = sum(x*y for x, y in list(zip(qtymkr, pricemkr)))
    sumprodtkr = sum(x*y for x, y in list(zip(qtytkr, pricetkr)))
    print (sumprodmkr)
    print (sumprodtkr)
    qtymkr.clear()
    pricemkr.clear()
    qtytkr.clear()
    pricetkr.clear()
    minute = unixmin

我确信有更有效的方法,如果有人能指出正确的方向,我将不胜感激。

完整脚本如下:

import websocket
import json
from datetime import datetime

socket = 'wss://stream.binance.com:9443/ws/btcusdt@trade'

ws = websocket.WebSocketApp(socket, on_message=on_message, on_error=on_error, on_close=on_close)

qtymkr = []
pricemkr = []
qtytkr = []
pricetkr = []

def on_message(ws, message):
    content = json.loads(message)
    ismaker = content['m']
    price = content['p']
    qty = content['q']
    unix = content['T']
    unix2 = int(content['T'])/1000
    unixmin = datetime.utcfromtimestamp(unix2).strftime('%M')
    
    if ismaker == 'True':
        qtymkr.append(float(qty))
        pricemkr.append(float(price))
    else:
        qtytkr.append(float(qty))
        pricetkr.append(float(price))
    
    global minute
    minute = 0
    
    if minute == 0:
        minute = unixmin
    elif unixmin == minute:
        pass
    elif unixmin != minute:
        sumprodmkr = sum(x*y for x, y in list(zip(qtymkr, pricemkr)))
        sumprodtkr = sum(x*y for x, y in list(zip(qtytkr, pricetkr)))
        print (sumprodmkr)
        print (sumprodtkr)
        qtymkr.clear()
        pricemkr.clear()
        qtytkr.clear()
        pricetkr.clear()
        minute = unixmin
    
def on_error(ws, error):
    print(error)
    
def on_close(ws, close_status_code, close_msg):
    print('Socket closed')

ws.run_forever()

Cron怎么样?

APScheduler - 很棒的 Python 包。享受! https://apscheduler.readthedocs.io/en/3.x/

该代码确实需要一些工作,但它很实用并且实现了我正在寻找的东西。

import websocket
import json
from datetime import datetime

socket = 'wss://stream.binance.com:9443/ws/btcusdt@trade'

qtymkr = []
pricemkr = []
qtytkr = []
pricetkr = []

minute = 0
unixmin = 0
sumprodmkr = 0 
sumprodtkr = 0

def minflag():
    global minute
    global unixmin
    global sumprodmkr
    global sumprodtkr
    if minute == 0:
        minute = unixmin
    elif unixmin == minute:
        break
    elif unixmin != minute:
        sumprodmkr = sum(x*y for x, y in list(zip(qtymkr, pricemkr)))
        sumprodtkr = sum(x*y for x, y in list(zip(qtytkr, pricetkr)))
        print (sumprodmkr)
        print (sumprodtkr)
        qtymkr.clear()
        pricemkr.clear()
        qtytkr.clear()
        pricetkr.clear()
        minute = unixmin
    

def on_message(ws, message):
    global minute
    global unixmin
    global sumprodmkr
    global sumprodtkr
    content = json.loads(message)
    ismaker = content['m']
    price = content['p']
    qty = content['q']
    unix = content['T']
    unix2 = int(content['T'])/1000
    unixmin = datetime.utcfromtimestamp(unix2).strftime('%M')
    if ismaker == True:
        qtymkr.append(float(qty))
        pricemkr.append(float(price))
    else:
        qtytkr.append(float(qty))
        pricetkr.append(float(price))
    minflag()

def on_error(ws, error):
    print(error)
    
def on_close(ws, close_status_code, close_msg):
    print('Socket closed')
        
ws = websocket.WebSocketApp(socket, on_message=on_message, on_error=on_error, on_close=on_close)

ws.run_forever()