正确使用带有 websocket 的 streamz
Correct use of streamz with websocket
我正在尝试找出使用 streamz
处理流数据的正确方法。我的流数据是使用 websocket-client
加载的,之后我这样做:
# open a stream and push updates into the stream
stream = Stream()
# establish a connection
ws = create_connection("ws://localhost:8765")
# get continuous updates
from tornado import gen
from tornado.ioloop import IOLoop
async def f():
while True:
await gen.sleep(0.001)
data = ws.recv()
stream.emit(data)
IOLoop.current().add_callback(f)
虽然这行得通,但我发现我的流跟不上流式数据(所以我在流中看到的数据比流式数据晚几秒,既高容量又高频率) .我尝试将 gen.sleep(0.001)
设置为较小的值(删除它会完全停止 jupyter 实验室),但问题仍然存在。
这是使用 websocket 将 streamz 与流数据连接的正确方法吗?
我认为 websocket-client
不提供异步 API,因此,它会阻塞事件循环。
您应该使用异步 websocket 客户端,例如 Tornado provides:
from tornado.websocket import websocket_connect
ws = websocket_connect("ws://localhost:8765")
async def f():
while True:
data = await ws.read_message()
if data is None:
break
else:
await stream.emit(data)
# considering you're receiving data from a localhost
# socket, it will be really fast, and the `await`
# statement above won't pause the while-loop for
# enough time for the event loop to have chance to
# run other things.
# Therefore, sleep for a small time to suspend the
# while-loop.
await gen.sleep(0.0001)
如果你是 receiving/sending 数据 from/to 远程连接,你不需要休眠,它的速度足以在 await
语句处暂停 while 循环。
我正在尝试找出使用 streamz
处理流数据的正确方法。我的流数据是使用 websocket-client
加载的,之后我这样做:
# open a stream and push updates into the stream
stream = Stream()
# establish a connection
ws = create_connection("ws://localhost:8765")
# get continuous updates
from tornado import gen
from tornado.ioloop import IOLoop
async def f():
while True:
await gen.sleep(0.001)
data = ws.recv()
stream.emit(data)
IOLoop.current().add_callback(f)
虽然这行得通,但我发现我的流跟不上流式数据(所以我在流中看到的数据比流式数据晚几秒,既高容量又高频率) .我尝试将 gen.sleep(0.001)
设置为较小的值(删除它会完全停止 jupyter 实验室),但问题仍然存在。
这是使用 websocket 将 streamz 与流数据连接的正确方法吗?
我认为 websocket-client
不提供异步 API,因此,它会阻塞事件循环。
您应该使用异步 websocket 客户端,例如 Tornado provides:
from tornado.websocket import websocket_connect
ws = websocket_connect("ws://localhost:8765")
async def f():
while True:
data = await ws.read_message()
if data is None:
break
else:
await stream.emit(data)
# considering you're receiving data from a localhost
# socket, it will be really fast, and the `await`
# statement above won't pause the while-loop for
# enough time for the event loop to have chance to
# run other things.
# Therefore, sleep for a small time to suspend the
# while-loop.
await gen.sleep(0.0001)
如果你是 receiving/sending 数据 from/to 远程连接,你不需要休眠,它的速度足以在 await
语句处暂停 while 循环。