Discord.py 后台任务中断但任务仍在等待处理(使用 websocket)
Discord.py background task breaks but task is still pending (using websocket)
一般来说,我对 asyncio 和异步编程非常缺乏经验,所以我一直很难尝试将同步 websocket 模块与异步 discord.py 模块一起使用。我正在尝试设置一个不断侦听 websocket 消息的机器人,如果它接收到数据,它会执行一些计算并以不一致的方式发送消息。这应该 运行 无限期地,并且 websocket 会定期更改其来源。这是我要完成的一些注释代码:
import requests
import websocket
import discord
import asyncio
from time import sleep
client = discord.Client() # Initialize the discord client
class Wrapper: # This just holds some variables so that I can use them without worrying about scope
data = None
first_time = True
is_running = False
socket_url = None
async def init_bot(): # The problem begins here
def on_message(ws, message):
if is_valid(message['data']): # is_valid is just a name for a special comparison I do with the message
Wrapper.data = message['data']
print('Received valid data')
def on_error(ws, error):
print('There was an error connecting to the websocket.')
ws = websocket.WebSocketApp(Wrapper.socket_url,
on_message=on_message,
on_error=on_error)
while True:
ws.run_forever() # I believe this is connected to the discord event loop?
# Using ws.close() here breaks the program when it receives data for the second time
async def start():
await client.wait_until_ready()
def get_response(hq_id, headers):
response = requests.get('my_data_source')
try:
return json.loads(response.text)
except:
return None
print('Starting websocket')
await init_bot()
while True: # This should run forever, but the outer coroutine gets task pending errors
print('Running main loop')
if Wrapper.first_time:
Wrapper.first_time = False
on_login = await client.send_message(Config.public_channel, embed=Config.waiting_embed()) # Sends an embed to a channel
while Wrapper.socket_url is None:
response = get_response(ID, HEADERS)
try:
Wrapper.socket_url = response['socket_url'] # Assume this sets the socket url to a websocket in the form ws://anyhost.com
await client.edit_message(on_login, embed=Config.connect_embed())
Wrapper.is_running = True
await asyncio.sleep(3)
except:
await asyncio.sleep(60) # The response will only sometimes include a proper socket_url, so we wait for one
if Wrapper.is_running:
while Wrapper.data is None: # Is this blocking? I essentially want this while loop to end when we have data from line 18
await asyncio.sleep(1)
if Wrapper.data is not None:
data_message = await client.send_message(Config.public_channel, embed=Wrapper.data[0])
await client.add_reaction(ans_message, '')
await client.add_reaction(ans_message, '')
if Wrapper.data[1] == True: # Marks the last message, so we want to reset the bot to its initial state
Wrapper.is_running = False
Wrapper.first_time = True
Wrapper.socket_url = None
await asyncio.sleep(100) # Sleep for a period of time in order to make sure the socket url is closed
Wrapper.data = None
await asyncio.sleep(3)
print('While loop ended?')
@client.event
async def on_ready():
print(f'My Bot\n')
client.loop.create_task(start())
client.run('<TOKEN>')
我已经尝试了上述的几种变体,但我通常得到的错误是这样的:
File "mybot.py", line 247, in <module>
client.loop.create_task(start())
task: <Task pending coro=<start() running at mybot.py> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(1016)(), <TaskWakeupMethWrapper object at 0x000002545FFFC8B8>()]
您不能只将 asyncio-aware 代码(例如 discord)与同步 websockets 代码混合。由于 init_bot
中没有任何等待,调用 await init_bot()
完全停止了事件循环。
相反,您需要在单独的线程中 运行 websocket 代码(init_bot
函数)和 await
适当的事件。例如:
def init_bot(loop, w):
def on_message(ws, message):
w.data = message['data']
loop.call_soon_threadsafe(w.event.set)
# ...
async def start():
# ...
loop = asyncio.get_event_loop()
w = Wrapper()
w.event = asyncio.Event()
threading.Thread(target=lambda: init_bot(loop, w)).start()
# ...
# instead of while Wrapper.data is None ...
await w.event.wait()
# ... process data
w.event.clear()
一般来说,我对 asyncio 和异步编程非常缺乏经验,所以我一直很难尝试将同步 websocket 模块与异步 discord.py 模块一起使用。我正在尝试设置一个不断侦听 websocket 消息的机器人,如果它接收到数据,它会执行一些计算并以不一致的方式发送消息。这应该 运行 无限期地,并且 websocket 会定期更改其来源。这是我要完成的一些注释代码:
import requests
import websocket
import discord
import asyncio
from time import sleep
client = discord.Client() # Initialize the discord client
class Wrapper: # This just holds some variables so that I can use them without worrying about scope
data = None
first_time = True
is_running = False
socket_url = None
async def init_bot(): # The problem begins here
def on_message(ws, message):
if is_valid(message['data']): # is_valid is just a name for a special comparison I do with the message
Wrapper.data = message['data']
print('Received valid data')
def on_error(ws, error):
print('There was an error connecting to the websocket.')
ws = websocket.WebSocketApp(Wrapper.socket_url,
on_message=on_message,
on_error=on_error)
while True:
ws.run_forever() # I believe this is connected to the discord event loop?
# Using ws.close() here breaks the program when it receives data for the second time
async def start():
await client.wait_until_ready()
def get_response(hq_id, headers):
response = requests.get('my_data_source')
try:
return json.loads(response.text)
except:
return None
print('Starting websocket')
await init_bot()
while True: # This should run forever, but the outer coroutine gets task pending errors
print('Running main loop')
if Wrapper.first_time:
Wrapper.first_time = False
on_login = await client.send_message(Config.public_channel, embed=Config.waiting_embed()) # Sends an embed to a channel
while Wrapper.socket_url is None:
response = get_response(ID, HEADERS)
try:
Wrapper.socket_url = response['socket_url'] # Assume this sets the socket url to a websocket in the form ws://anyhost.com
await client.edit_message(on_login, embed=Config.connect_embed())
Wrapper.is_running = True
await asyncio.sleep(3)
except:
await asyncio.sleep(60) # The response will only sometimes include a proper socket_url, so we wait for one
if Wrapper.is_running:
while Wrapper.data is None: # Is this blocking? I essentially want this while loop to end when we have data from line 18
await asyncio.sleep(1)
if Wrapper.data is not None:
data_message = await client.send_message(Config.public_channel, embed=Wrapper.data[0])
await client.add_reaction(ans_message, '')
await client.add_reaction(ans_message, '')
if Wrapper.data[1] == True: # Marks the last message, so we want to reset the bot to its initial state
Wrapper.is_running = False
Wrapper.first_time = True
Wrapper.socket_url = None
await asyncio.sleep(100) # Sleep for a period of time in order to make sure the socket url is closed
Wrapper.data = None
await asyncio.sleep(3)
print('While loop ended?')
@client.event
async def on_ready():
print(f'My Bot\n')
client.loop.create_task(start())
client.run('<TOKEN>')
我已经尝试了上述的几种变体,但我通常得到的错误是这样的:
File "mybot.py", line 247, in <module>
client.loop.create_task(start())
task: <Task pending coro=<start() running at mybot.py> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(1016)(), <TaskWakeupMethWrapper object at 0x000002545FFFC8B8>()]
您不能只将 asyncio-aware 代码(例如 discord)与同步 websockets 代码混合。由于 init_bot
中没有任何等待,调用 await init_bot()
完全停止了事件循环。
相反,您需要在单独的线程中 运行 websocket 代码(init_bot
函数)和 await
适当的事件。例如:
def init_bot(loop, w):
def on_message(ws, message):
w.data = message['data']
loop.call_soon_threadsafe(w.event.set)
# ...
async def start():
# ...
loop = asyncio.get_event_loop()
w = Wrapper()
w.event = asyncio.Event()
threading.Thread(target=lambda: init_bot(loop, w)).start()
# ...
# instead of while Wrapper.data is None ...
await w.event.wait()
# ... process data
w.event.clear()