新消息到达时终止先前的高速公路 websocket 调用
Terminate previous autobahn websocket call when a new message arrives
我正在设计一个功能,通过 websocket 提供类似自动完成的搜索结果。当用户键入速度足够快时,他们之前的查询通常会变得过时,因为他们已经在询问新信息。
有什么方法可以识别新查询何时进入,并终止先前的查询吗?当收到新消息时,我尝试检查查询是否正在处理,但似乎新消息仅在 之前的查询(我想取消的查询)完成后才处理。我也很困惑当有多个用户同时搜索时这将如何工作。
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
import json
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
running = 'no'
def main_search(query): # May take up to 400ms to process
...
class SearchServerProtocol(WebSocketServerProtocol):
@inlineCallbacks
def onMessage(self, payload, isBinary):
if not isBinary:
x = json.loads(payload.decode('utf8'))
global running
try:
print running
running = 'yes'
res = yield main_search(x['query'])
running = 'no'
print x['query']
except Exception as e:
print e
self.sendClose(1000)
else:
self.sendMessage(json.dumps(res).encode('utf8'))
if __name__ == '__main__':
import sys
from twisted.python import log
from twisted.internet import reactor
log.startLogging(sys.stdout)
factory = WebSocketServerFactory("ws://104.236.31.77:8080", debug=False)
factory.protocol = SearchServerProtocol
reactor.listenTCP(8080, factory)
reactor.run()
print running
总是returns没有
谢谢!
我认为您需要某种表示您的用户会话(或搜索会话)的上下文对象。在这种情况下,您可以放置一个 latestSearchId
来递增每次搜索。然后您可以将 searchId 参数添加到 main_search
。假设您有一个循环或某种可以中止的不同阶段,您可以通过将其与搜索会话的 latestSearchId 进行比较来测试当前搜索中的 searchId 是否仍然是最新的。
另一种方法(例如,如果您不能中止搜索)可能是在计算搜索之前等待几毫秒,同时检查是否有任何新的搜索进入。
根据您的评论编辑。
你遇到的问题是你永远不应该阻塞反应器回路。你需要做的是将你的 main_search
切成碎片,这样你就可以 return 控制反应堆回路。
您是否可以做类似的事情:
def resume_search(position):
#stuff
reactor.callLater(0, resume_search, current_position)
reactor.callLater 将安排您的函数在完成其工作后立即调用。您应该将反应器循环视为一个大 while True
,基本上可以完成 3 件事
- 检查传入 IO
- 执行你的东西(处理事件)
- 发送传出 IO
只要您的代码保持 运行ning 就永远不会到达其他两个。因此,当您插入 reactor.callLater 时,它会 运行 反应器循环,直到延迟变量(在我们的例子中可以为 0)过去。请注意,不能保证它会及时调用您的函数。这是因为很可能某些东西 运行ning(阻塞反应器)比您指定的时间间隔长。所以你应该把这个 reactor.callLater(0, fun)
看成 "call me when you're not busy"
我正在设计一个功能,通过 websocket 提供类似自动完成的搜索结果。当用户键入速度足够快时,他们之前的查询通常会变得过时,因为他们已经在询问新信息。
有什么方法可以识别新查询何时进入,并终止先前的查询吗?当收到新消息时,我尝试检查查询是否正在处理,但似乎新消息仅在 之前的查询(我想取消的查询)完成后才处理。我也很困惑当有多个用户同时搜索时这将如何工作。
from autobahn.twisted.websocket import WebSocketServerProtocol, WebSocketServerFactory
import json
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
running = 'no'
def main_search(query): # May take up to 400ms to process
...
class SearchServerProtocol(WebSocketServerProtocol):
@inlineCallbacks
def onMessage(self, payload, isBinary):
if not isBinary:
x = json.loads(payload.decode('utf8'))
global running
try:
print running
running = 'yes'
res = yield main_search(x['query'])
running = 'no'
print x['query']
except Exception as e:
print e
self.sendClose(1000)
else:
self.sendMessage(json.dumps(res).encode('utf8'))
if __name__ == '__main__':
import sys
from twisted.python import log
from twisted.internet import reactor
log.startLogging(sys.stdout)
factory = WebSocketServerFactory("ws://104.236.31.77:8080", debug=False)
factory.protocol = SearchServerProtocol
reactor.listenTCP(8080, factory)
reactor.run()
print running
总是returns没有
谢谢!
我认为您需要某种表示您的用户会话(或搜索会话)的上下文对象。在这种情况下,您可以放置一个 latestSearchId
来递增每次搜索。然后您可以将 searchId 参数添加到 main_search
。假设您有一个循环或某种可以中止的不同阶段,您可以通过将其与搜索会话的 latestSearchId 进行比较来测试当前搜索中的 searchId 是否仍然是最新的。
另一种方法(例如,如果您不能中止搜索)可能是在计算搜索之前等待几毫秒,同时检查是否有任何新的搜索进入。
根据您的评论编辑。
你遇到的问题是你永远不应该阻塞反应器回路。你需要做的是将你的 main_search
切成碎片,这样你就可以 return 控制反应堆回路。
您是否可以做类似的事情:
def resume_search(position):
#stuff
reactor.callLater(0, resume_search, current_position)
reactor.callLater 将安排您的函数在完成其工作后立即调用。您应该将反应器循环视为一个大 while True
,基本上可以完成 3 件事
- 检查传入 IO
- 执行你的东西(处理事件)
- 发送传出 IO
只要您的代码保持 运行ning 就永远不会到达其他两个。因此,当您插入 reactor.callLater 时,它会 运行 反应器循环,直到延迟变量(在我们的例子中可以为 0)过去。请注意,不能保证它会及时调用您的函数。这是因为很可能某些东西 运行ning(阻塞反应器)比您指定的时间间隔长。所以你应该把这个 reactor.callLater(0, fun)
看成 "call me when you're not busy"