正确使用带有 websocket 的 streamz

Correct use of streamz with websocket

我正在尝试找出使用 streamz 处理流数据的正确方法。我的流数据是使用 websocket-client 加载的,之后我这样做:

# open a stream and push updates into the stream
stream = Stream()

# establish a connection
ws = create_connection("ws://localhost:8765")

# get continuous updates
from tornado import gen
from tornado.ioloop import IOLoop

async def f():
    while True:
        await gen.sleep(0.001)
        data = ws.recv()
        stream.emit(data)
        
IOLoop.current().add_callback(f)

虽然这行得通,但我发现我的流跟不上流式数据(所以我在流中看到的数据比流式数据晚几秒,既高容量又高频率) .我尝试将 gen.sleep(0.001) 设置为较小的值(删除它会完全停止 jupyter 实验室),但问题仍然存在。

这是使用 websocket 将 streamz 与流数据连接的正确方法吗?

我认为 websocket-client 不提供异步 API,因此,它会阻塞事件循环。

您应该使用异步 websocket 客户端,例如 Tornado provides:

from tornado.websocket import websocket_connect

ws = websocket_connect("ws://localhost:8765")

async def f():
    while True:
        data = await ws.read_message()

        if data is None:
            break
        else:
            await stream.emit(data)

        # considering you're receiving data from a localhost
        # socket, it will be really fast, and the `await` 
        # statement above won't pause the while-loop for 
        # enough time for the event loop to have chance to 
        # run other things.
        # Therefore, sleep for a small time to suspend the 
        # while-loop.

        await gen.sleep(0.0001) 

如果你是 receiving/sending 数据 from/to 远程连接,你不需要休眠,它的速度足以在 await 语句处暂停 while 循环。