如何在Python中以先进先出的方式处理多个WebSockets?
How to process multiple WebSockets in a FIFO way in Python?
我正在编写一个处理两个 WebSocket 的函数,每个 WebSocket 的响应都会更改一个共享的 DataFrame df。
import json
import asyncio
import websockets
@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket, dataRequest, quoteRequest):
yield from dataSocket.send(dataRequest)
yield from quoteSocket.send(quoteRequest)
response = yield from dataSocket.recv() # skip first response
response = yield from quoteSocket.recv() # skip first response
while True:
response = yield from dataSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
response = yield from quoteSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
我不确定,但当前代码似乎轮流处理两个 WebSocket。我想以 "first in first out" 的方式处理响应,不管它来自哪个 WebSocket。我应该如何改变才能实现这个目标?
因为您在同一个 while 循环中使用了两个 yield from
语句,所以它将按顺序处理它们,然后无限重复。
所以它会一直等到它得到 dataSocket
的响应,然后它会等到它得到 quoteSocket
的响应,然后冲洗并重复。
Tasks()
可以很好地完成您想要做的事情,因为它们允许协程彼此独立运行。因此,如果您在各自的任务包装器中启动两个独立的协程,那么每个协程都将等待自己的下一个响应,而不必打扰另一个。
例如:
import json
import asyncio
import websockets
@asyncio.coroutine
def coroutine_1(df, dataSocket):
yield from dataSocket.send(dataRequest)
response = yield from dataSocket.recv() # skip first response
while True:
response = yield from dataSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
@asyncio.coroutine
def coroutine_2(df, quoteSocket):
yield from quoteSocket.send(quoteRequest)
response = yield from quoteSocket.recv() # skip first response
while True:
response = yield from quoteSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket):
websocket_task_1 = asyncio.ensure_future(coroutine_1(df, dataSocket))
websocket_task_2 = asyncio.ensure_future(coroutine_2(df, quoteSocket))
yield from asyncio.wait([websocket_task_1, websocket_task_2])
我正在编写一个处理两个 WebSocket 的函数,每个 WebSocket 的响应都会更改一个共享的 DataFrame df。
import json
import asyncio
import websockets
@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket, dataRequest, quoteRequest):
yield from dataSocket.send(dataRequest)
yield from quoteSocket.send(quoteRequest)
response = yield from dataSocket.recv() # skip first response
response = yield from quoteSocket.recv() # skip first response
while True:
response = yield from dataSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
response = yield from quoteSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
我不确定,但当前代码似乎轮流处理两个 WebSocket。我想以 "first in first out" 的方式处理响应,不管它来自哪个 WebSocket。我应该如何改变才能实现这个目标?
因为您在同一个 while 循环中使用了两个 yield from
语句,所以它将按顺序处理它们,然后无限重复。
所以它会一直等到它得到 dataSocket
的响应,然后它会等到它得到 quoteSocket
的响应,然后冲洗并重复。
Tasks()
可以很好地完成您想要做的事情,因为它们允许协程彼此独立运行。因此,如果您在各自的任务包装器中启动两个独立的协程,那么每个协程都将等待自己的下一个响应,而不必打扰另一个。
例如:
import json
import asyncio
import websockets
@asyncio.coroutine
def coroutine_1(df, dataSocket):
yield from dataSocket.send(dataRequest)
response = yield from dataSocket.recv() # skip first response
while True:
response = yield from dataSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
@asyncio.coroutine
def coroutine_2(df, quoteSocket):
yield from quoteSocket.send(quoteRequest)
response = yield from quoteSocket.recv() # skip first response
while True:
response = yield from quoteSocket.recv()
print("<< {}".format(json.loads(response)))
df = changeRecord(df, response)
@asyncio.coroutine
def printResponse(df, dataSocket, quoteSocket):
websocket_task_1 = asyncio.ensure_future(coroutine_1(df, dataSocket))
websocket_task_2 = asyncio.ensure_future(coroutine_2(df, quoteSocket))
yield from asyncio.wait([websocket_task_1, websocket_task_2])