Binance 多线程套接字 - 不同时调用的函数

Binance Multithread Sockets - functions not called concurrently

我有一个代码可以从 binance 接收有关当前价格的数据:

import asyncio
from binance import AsyncClient, BinanceSocketManager
import time
from datetime import datetime


def analyze(res):
    kline = res['k']

    if kline['x']: #candle is compleated
        print('{} start_sleeping {} {}'.format(
            datetime.now(),
            kline['s'],
            datetime.fromtimestamp(kline['t'] / 1000),
        ))
        time.sleep(5)
        print('{} finish_sleeping {}'.format(datetime.now(), kline['s']))


async def open_binance_stream(symbol):
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    ts = bm.kline_socket(symbol)
    async with ts as tscm:
        while True:
            res = await tscm.recv()
            analyze(res)

    await client.close_connection()


async def main():
    t1 = asyncio.create_task(open_binance_stream('ETHBTC'))
    t2 = asyncio.create_task(open_binance_stream('XRPBTC'))
    await asyncio.gather(*[t1, t2])


if __name__ == "__main__":
    asyncio.run(main())

如何使analyze函数被并发调用。 币安同时发送信息和两个流数据(ETHBTC 和 XRPBTC)

但是函数 analyze 只有在前一个 analyze(睡眠)完成后才会被调用。

我希望函数 analyze 被立即独立调用。

您是否尝试过将 分析 放入线程中。我想它会实现你想要的。

import asyncio
from binance import AsyncClient, BinanceSocketManager
import time
from datetime import datetime
from threading import Thread

def analyze(res):
    kline = res['k']

    if kline['x']: #candle is compleated
        print('{} start_sleeping {} {}'.format(
            datetime.now(),
            kline['s'],
            datetime.fromtimestamp(kline['t'] / 1000),
        ))
        time.sleep(5)
        print('{} finish_sleeping {}'.format(datetime.now(), kline['s']))


async def open_binance_stream(symbol):
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    ts = bm.kline_socket(symbol)
    async with ts as tscm:
        while True:
            res = await tscm.recv()
            Thread(target= analyze, args = (res)).start()

    await client.close_connection()


async def main():
    t1 = asyncio.create_task(open_binance_stream('ETHBTC'))
    t2 = asyncio.create_task(open_binance_stream('XRPBTC'))
    await asyncio.gather(*[t1, t2])


if __name__ == "__main__":
    asyncio.run(main())

这应该按预期工作。