为 await 函数创建另一个线程
Create another thread for an await function
我是第一次使用 Webserver,我之前使用过套接字和并行性,但它非常不同且简单,它没有使用 Async 作为并行性。
我的目标很简单,我有我的服务器和我的客户端。在我的客户端中,我想创建一个单独的线程来接收服务器将发送的消息,并在前一个线程中做一些其他事情,如代码示例 (client.py):
from typing import Dict
import websockets
import asyncio
import json
URL = "my localhost webserver"
connection = None
async def listen() -> None:
global connection
input("Press enter to connect.")
async with websockets.connect(URL) as ws:
connection = ws
msg_initial: Dict[str,str] = get_dict()
await ws.send(json.dumps(msg_initial))
## This i want to be in another thread
await receive_msg()
print("I`m at listener`s thread")
# do some stuffs
async def recieve_msg() -> None:
while True:
msg = await connection.recv()
print(f"Server: {msg}")
asyncio.get_event_loop().run_until_complete(listen())
为了让我收到消息,我需要在 recv()
中使用 await
,但我不知道如何为此创建一个单独的线程。我已经尝试使用 threading
创建一个单独的线程,但它没有用。
有谁知道怎么做,是否可行?
不清楚你想做什么可以按照你建议的方式完成。在下面的示例中,我连接到回显服务器。直接实现您所建议的最直接的方法是创建一个将连接传递到的新线程。但这并不完全有效:
import websockets
import asyncio
from threading import Thread
URL = "ws://localhost:4000"
async def listen() -> None:
async with websockets.connect(URL) as ws:
# pass connection:
t = Thread(target=receiver_thread, args=(ws,))
t.start()
# Generate some messages to be echoed back:
await ws.send('msg1')
await ws.send('msg2')
await ws.send('msg3')
await ws.send('msg4')
await ws.send('msg5')
def receiver_thread(connection):
print("I`m at listener`s thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(receive_msg(connection))
async def receive_msg(connection) -> None:
while True:
msg = await connection.recv()
print(f"Server: {msg}")
asyncio.get_event_loop().run_until_complete(listen())
打印:
I`m at listener`s thread
Server: msg1
Server: msg2
Server: msg3
Server: msg4
Server: msg5
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Program Files\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Program Files\Python38\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Ron\test\test.py", line 22, in receiver_thread
loop.run_until_complete(receive_msg(connection))
File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
return future.result()
File "C:\Ron\test\test.py", line 29, in receive_msg
msg = await connection.recv()
File "C:\Program Files\Python38\lib\site-packages\websockets\legacy\protocol.py", line 404, in recv
await asyncio.wait(
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in wait
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in <setcomp>
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 667, in ensure_future
raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument
消息接收正常,但问题出现在语句 receiver_thread
的函数中:
loop.run_until_complete(receive_msg(connection))
根据需要,启动的线程没有 运行 事件循环,并且不能使用函数 listen
正在使用的事件循环,因此必须创建一个新的事件循环。如果此 thread/event 循环不使用来自差异事件循环的任何资源(即连接),那就没问题了:
import websockets
import asyncio
from threading import Thread
URL = "ws://localhost:4000"
async def listen() -> None:
async with websockets.connect(URL) as ws:
t = Thread(target=receiver_thread)
t.start()
def receiver_thread():
print("I`m at listener`s thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(receive_msg())
async def receive_msg() -> None:
await asyncio.sleep(2)
print('I just slept for 2 seconds')
asyncio.get_event_loop().run_until_complete(listen())
打印:
I`m at listener`s thread
I just slept for 2 seconds
我看不出真正需要 运行 基于你展示的最少代码的线程中的任何东西,但假设你省略了显示对接收到的消息的一些处理,而 asyncio
单独不是足够了,那么也许您需要做的就是在当前 运行 循环中接收消息(在函数 listen
中)并使用线程处理消息:
from typing import Dict
import websockets
import asyncio
import json
from threading import Thread
URL = "my localhost webserver"
async def listen() -> None:
input("Press enter to connect.")
async with websockets.connect(URL) as ws:
msg_initial: Dict[str,str] = get_dict()
await ws.send(json.dumps(msg_initial))
while True:
msg = await ws.recv()
print(f"Server: {msg}")
# Non-daemon threads so program will not end until these threads terminate:
t = Thread(target=process_msg, args=(msg,))
t.start()
asyncio.get_event_loop().run_until_complete(listen())
更新
根据你对我关于创建聊天程序的回答的最后评论,你应该使用纯多线程或纯异步来实现它。这是使用 asyncio 的粗略概述:
import websockets
import asyncio
import aioconsole
URL = "my localhost webserver"
async def receiver(connection):
while True:
msg = await connection.recv()
print(f"\nServer: {msg}")
async def sender(connection):
while True:
msg = await aioconsole.ainput('\nEnter msg: ')
await connection.send(msg)
async def chat() -> None:
async with websockets.connect(URL) as ws:
await asyncio.gather(
receiver(ws),
sender(ws)
)
asyncio.get_event_loop().run_until_complete(chat())
但是,您可以使用 asyncio 执行的用户输入类型可能会受到限制。因此,我认为多线程可能是更好的方法。
我是第一次使用 Webserver,我之前使用过套接字和并行性,但它非常不同且简单,它没有使用 Async 作为并行性。
我的目标很简单,我有我的服务器和我的客户端。在我的客户端中,我想创建一个单独的线程来接收服务器将发送的消息,并在前一个线程中做一些其他事情,如代码示例 (client.py):
from typing import Dict
import websockets
import asyncio
import json
URL = "my localhost webserver"
connection = None
async def listen() -> None:
global connection
input("Press enter to connect.")
async with websockets.connect(URL) as ws:
connection = ws
msg_initial: Dict[str,str] = get_dict()
await ws.send(json.dumps(msg_initial))
## This i want to be in another thread
await receive_msg()
print("I`m at listener`s thread")
# do some stuffs
async def recieve_msg() -> None:
while True:
msg = await connection.recv()
print(f"Server: {msg}")
asyncio.get_event_loop().run_until_complete(listen())
为了让我收到消息,我需要在 recv()
中使用 await
,但我不知道如何为此创建一个单独的线程。我已经尝试使用 threading
创建一个单独的线程,但它没有用。
有谁知道怎么做,是否可行?
不清楚你想做什么可以按照你建议的方式完成。在下面的示例中,我连接到回显服务器。直接实现您所建议的最直接的方法是创建一个将连接传递到的新线程。但这并不完全有效:
import websockets
import asyncio
from threading import Thread
URL = "ws://localhost:4000"
async def listen() -> None:
async with websockets.connect(URL) as ws:
# pass connection:
t = Thread(target=receiver_thread, args=(ws,))
t.start()
# Generate some messages to be echoed back:
await ws.send('msg1')
await ws.send('msg2')
await ws.send('msg3')
await ws.send('msg4')
await ws.send('msg5')
def receiver_thread(connection):
print("I`m at listener`s thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(receive_msg(connection))
async def receive_msg(connection) -> None:
while True:
msg = await connection.recv()
print(f"Server: {msg}")
asyncio.get_event_loop().run_until_complete(listen())
打印:
I`m at listener`s thread
Server: msg1
Server: msg2
Server: msg3
Server: msg4
Server: msg5
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Program Files\Python38\lib\threading.py", line 932, in _bootstrap_inner
self.run()
File "C:\Program Files\Python38\lib\threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "C:\Ron\test\test.py", line 22, in receiver_thread
loop.run_until_complete(receive_msg(connection))
File "C:\Program Files\Python38\lib\asyncio\base_events.py", line 616, in run_until_complete
return future.result()
File "C:\Ron\test\test.py", line 29, in receive_msg
msg = await connection.recv()
File "C:\Program Files\Python38\lib\site-packages\websockets\legacy\protocol.py", line 404, in recv
await asyncio.wait(
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in wait
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 424, in <setcomp>
fs = {ensure_future(f, loop=loop) for f in set(fs)}
File "C:\Program Files\Python38\lib\asyncio\tasks.py", line 667, in ensure_future
raise ValueError('The future belongs to a different loop than '
ValueError: The future belongs to a different loop than the one specified as the loop argument
消息接收正常,但问题出现在语句 receiver_thread
的函数中:
loop.run_until_complete(receive_msg(connection))
根据需要,启动的线程没有 运行 事件循环,并且不能使用函数 listen
正在使用的事件循环,因此必须创建一个新的事件循环。如果此 thread/event 循环不使用来自差异事件循环的任何资源(即连接),那就没问题了:
import websockets
import asyncio
from threading import Thread
URL = "ws://localhost:4000"
async def listen() -> None:
async with websockets.connect(URL) as ws:
t = Thread(target=receiver_thread)
t.start()
def receiver_thread():
print("I`m at listener`s thread")
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(receive_msg())
async def receive_msg() -> None:
await asyncio.sleep(2)
print('I just slept for 2 seconds')
asyncio.get_event_loop().run_until_complete(listen())
打印:
I`m at listener`s thread
I just slept for 2 seconds
我看不出真正需要 运行 基于你展示的最少代码的线程中的任何东西,但假设你省略了显示对接收到的消息的一些处理,而 asyncio
单独不是足够了,那么也许您需要做的就是在当前 运行 循环中接收消息(在函数 listen
中)并使用线程处理消息:
from typing import Dict
import websockets
import asyncio
import json
from threading import Thread
URL = "my localhost webserver"
async def listen() -> None:
input("Press enter to connect.")
async with websockets.connect(URL) as ws:
msg_initial: Dict[str,str] = get_dict()
await ws.send(json.dumps(msg_initial))
while True:
msg = await ws.recv()
print(f"Server: {msg}")
# Non-daemon threads so program will not end until these threads terminate:
t = Thread(target=process_msg, args=(msg,))
t.start()
asyncio.get_event_loop().run_until_complete(listen())
更新
根据你对我关于创建聊天程序的回答的最后评论,你应该使用纯多线程或纯异步来实现它。这是使用 asyncio 的粗略概述:
import websockets
import asyncio
import aioconsole
URL = "my localhost webserver"
async def receiver(connection):
while True:
msg = await connection.recv()
print(f"\nServer: {msg}")
async def sender(connection):
while True:
msg = await aioconsole.ainput('\nEnter msg: ')
await connection.send(msg)
async def chat() -> None:
async with websockets.connect(URL) as ws:
await asyncio.gather(
receiver(ws),
sender(ws)
)
asyncio.get_event_loop().run_until_complete(chat())
但是,您可以使用 asyncio 执行的用户输入类型可能会受到限制。因此,我认为多线程可能是更好的方法。