运行 Python 异步服务器循环和 Future 并发
Running Python async server loop and Future concurrently
我正在做一个项目,我有一个简单的套接字服务器,它正在寻找连接以使用消息队列中的消息回复客户端。同时,我有一个使用 PyWin32 的监视系统,它正在查找目录中文件的更改。当 PyWin32 检测到文件更改时,它应该将信息附加到消息队列中。当服务器检测到连接时,它应该从消息队列中获取相关信息。服务器将不断寻找连接,而 PyWin32 仅在检测到更改时 运行s。
我可以 运行 他们单独成功,但不能一起。似乎我的代码挂在一个函数上,而另一个函数停止处理。我已经阅读了文档并细读了 SO 帖子,但似乎没有什么可以解决我的问题。
这是套接字服务器的示例:
class TestSocketServer:
def __init__(self,
# response_handler: Callable
):
self._alive: bool = False
# Configure client-server connection
self._host: str = 'localhost'
self._port: int = 50000
self._mq = {}
def _respond(self, reader):
request = self._mq[reader].get()
return f"Got request {request}"
async def _client_connected(self, reader, writer):
data = await reader.read(1024)
# print(reader, type(reader))
message = data.decode()
print(message)
self._mq[reader]: queue.Queue = queue.Queue()
self._mq[reader].put_nowait(message)
response = self._respond(reader)
writer.write(response.encode())
await writer.drain()
writer.close()
async def handle_traffic(self):
server = await asyncio.start_server(
self._client_connected,
self._host,
self._port)
await server.serve_forever()
def start(self):
asyncio.run(self.handle_traffic())
PyWin32 处理程序如下所示:
async def _monitor(self):
"""
Look for changes in directory and return them
:return: FILE_NOTIFY_INFORMATION
"""
ReadDirectoryChangesW(self._handle, # Python version of Windows Handle
self._buffer,
TRUE, # Monitor directory tree
# Attributes to monitor
(FILE_NOTIFY_CHANGE_SIZE |
FILE_NOTIFY_CHANGE_ATTRIBUTES |
FILE_NOTIFY_CHANGE_DIR_NAME |
FILE_NOTIFY_CHANGE_FILE_NAME |
FILE_NOTIFY_CHANGE_LAST_WRITE)
)
change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)
action, item = change[0]
act_str = EventWatcher._ACTIONS.get(action)
if change:
chg_msg = {str(time.time()): [str(act_str), item]}
# TODO try to push changes through socket before persisting
await self._write_changes(action=action, change=chg_msg)
async.run(self._monitor)
我是如何尝试实现它们的(不包括 async.run() 调用):
loop = asyncio.new_event_loop()
loop.create_task(self.handle_traffic())
loop.create_task(self.monitor())
loop.run_forever()
我还尝试在我的循环中为监控部分创建一个 Future:
loop = asyncio.new_event_loop()
loop.create_task(self.handle_traffic())
fut = loop.create_future()
loop.create_task(self.monitor(fut))
loop.run_forever()
感谢@thisisalsomypassword 指出我的 Win32 API 调用阻塞了脚本的其余部分,我需要使用线程。在下面的实现中执行此操作解决了我的问题:
class TestSocketServer:
def __init__(self,
# response_handler: Callable
):
self._alive: bool = False
# Configure client-server connection
self._host: str = 'localhost'
self._port: int = 50000
self._mq = {}
def _respond(self, reader):
request = self._mq[reader].get()
return f"Got request {request}"
async def _client_connected(self, reader, writer):
data = await reader.read(1024)
# print(reader, type(reader))
message = data.decode()
print(message)
self._mq[reader]: queue.Queue = queue.Queue()
self._mq[reader].put_nowait(message)
response = self._respond(reader)
writer.write(response.encode())
await writer.drain()
writer.close()
async def start_serving(self):
await asyncio.start_server(
self._client_connected,
self._host,
self._port)
对于观察者:
def monitor(self):
"""
Look for changes in directory and return them
:return: FILE_NOTIFY_INFORMATION
"""
while True:
ReadDirectoryChangesW(self._handle, # Python version of Windows Handle
self._buffer,
TRUE, # Monitor directory tree
# Attributes to monitor
(FILE_NOTIFY_CHANGE_SIZE |
FILE_NOTIFY_CHANGE_ATTRIBUTES |
FILE_NOTIFY_CHANGE_DIR_NAME |
FILE_NOTIFY_CHANGE_FILE_NAME |
FILE_NOTIFY_CHANGE_LAST_WRITE)
)
change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)
action, item = change[0]
print(change)
然后以这种方式实现这两种方法:
self._server = TestSocketServer()
self._watcher = EventWatcher()
def main(self):
loop = asyncio.get_event_loop()
loop.create_task(asyncio.to_thread(self._watcher.monitor))
loop.create_task(self._server.start_serving())
loop.run_forever()
main()
现在,每当有套接字请求或文件系统发生更改时,我都会看到两者都出现,并且程序会按预期继续处理未来的输入。
我正在做一个项目,我有一个简单的套接字服务器,它正在寻找连接以使用消息队列中的消息回复客户端。同时,我有一个使用 PyWin32 的监视系统,它正在查找目录中文件的更改。当 PyWin32 检测到文件更改时,它应该将信息附加到消息队列中。当服务器检测到连接时,它应该从消息队列中获取相关信息。服务器将不断寻找连接,而 PyWin32 仅在检测到更改时 运行s。
我可以 运行 他们单独成功,但不能一起。似乎我的代码挂在一个函数上,而另一个函数停止处理。我已经阅读了文档并细读了 SO 帖子,但似乎没有什么可以解决我的问题。
这是套接字服务器的示例:
class TestSocketServer:
def __init__(self,
# response_handler: Callable
):
self._alive: bool = False
# Configure client-server connection
self._host: str = 'localhost'
self._port: int = 50000
self._mq = {}
def _respond(self, reader):
request = self._mq[reader].get()
return f"Got request {request}"
async def _client_connected(self, reader, writer):
data = await reader.read(1024)
# print(reader, type(reader))
message = data.decode()
print(message)
self._mq[reader]: queue.Queue = queue.Queue()
self._mq[reader].put_nowait(message)
response = self._respond(reader)
writer.write(response.encode())
await writer.drain()
writer.close()
async def handle_traffic(self):
server = await asyncio.start_server(
self._client_connected,
self._host,
self._port)
await server.serve_forever()
def start(self):
asyncio.run(self.handle_traffic())
PyWin32 处理程序如下所示:
async def _monitor(self):
"""
Look for changes in directory and return them
:return: FILE_NOTIFY_INFORMATION
"""
ReadDirectoryChangesW(self._handle, # Python version of Windows Handle
self._buffer,
TRUE, # Monitor directory tree
# Attributes to monitor
(FILE_NOTIFY_CHANGE_SIZE |
FILE_NOTIFY_CHANGE_ATTRIBUTES |
FILE_NOTIFY_CHANGE_DIR_NAME |
FILE_NOTIFY_CHANGE_FILE_NAME |
FILE_NOTIFY_CHANGE_LAST_WRITE)
)
change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)
action, item = change[0]
act_str = EventWatcher._ACTIONS.get(action)
if change:
chg_msg = {str(time.time()): [str(act_str), item]}
# TODO try to push changes through socket before persisting
await self._write_changes(action=action, change=chg_msg)
async.run(self._monitor)
我是如何尝试实现它们的(不包括 async.run() 调用):
loop = asyncio.new_event_loop()
loop.create_task(self.handle_traffic())
loop.create_task(self.monitor())
loop.run_forever()
我还尝试在我的循环中为监控部分创建一个 Future:
loop = asyncio.new_event_loop()
loop.create_task(self.handle_traffic())
fut = loop.create_future()
loop.create_task(self.monitor(fut))
loop.run_forever()
感谢@thisisalsomypassword 指出我的 Win32 API 调用阻塞了脚本的其余部分,我需要使用线程。在下面的实现中执行此操作解决了我的问题:
class TestSocketServer:
def __init__(self,
# response_handler: Callable
):
self._alive: bool = False
# Configure client-server connection
self._host: str = 'localhost'
self._port: int = 50000
self._mq = {}
def _respond(self, reader):
request = self._mq[reader].get()
return f"Got request {request}"
async def _client_connected(self, reader, writer):
data = await reader.read(1024)
# print(reader, type(reader))
message = data.decode()
print(message)
self._mq[reader]: queue.Queue = queue.Queue()
self._mq[reader].put_nowait(message)
response = self._respond(reader)
writer.write(response.encode())
await writer.drain()
writer.close()
async def start_serving(self):
await asyncio.start_server(
self._client_connected,
self._host,
self._port)
对于观察者:
def monitor(self):
"""
Look for changes in directory and return them
:return: FILE_NOTIFY_INFORMATION
"""
while True:
ReadDirectoryChangesW(self._handle, # Python version of Windows Handle
self._buffer,
TRUE, # Monitor directory tree
# Attributes to monitor
(FILE_NOTIFY_CHANGE_SIZE |
FILE_NOTIFY_CHANGE_ATTRIBUTES |
FILE_NOTIFY_CHANGE_DIR_NAME |
FILE_NOTIFY_CHANGE_FILE_NAME |
FILE_NOTIFY_CHANGE_LAST_WRITE)
)
change = FILE_NOTIFY_INFORMATION(self._buffer, 9999)
action, item = change[0]
print(change)
然后以这种方式实现这两种方法:
self._server = TestSocketServer()
self._watcher = EventWatcher()
def main(self):
loop = asyncio.get_event_loop()
loop.create_task(asyncio.to_thread(self._watcher.monitor))
loop.create_task(self._server.start_serving())
loop.run_forever()
main()
现在,每当有套接字请求或文件系统发生更改时,我都会看到两者都出现,并且程序会按预期继续处理未来的输入。