无法从 Python 中的另一个线程函数触发异步函数

Cannot trigger an async function from another threaded function in Python

我正在制作一个 discord 机器人,它会不时地使用请求获取 json,然后将相关信息发送到特定频道。

我有以下 classes:

我现在有两个问题:

虽然 tasker 在不同的线程上,但每次我尝试通过 getInfoHelper 交谈时,它都会给我错误 RuntimeError: no running event loopRuntimeWarning: coroutine 'getInfo.discordmsg' was never awaited

如果我不 运行 它在不同的线程上,但是,它确实在 TestStatus: 1 上工作,但它会使 Helper 卡住并停止 运行 TestStatus: 2

无论如何,这是代码

import requests
import asyncio
import discord
from discord.ext import commands, tasks
from datetime import datetime, timedelta
import threading

class Helper(discord.Client):
    async def on_ready(self):
        global discordbot, taskervar
        servername = 'ServerName'
        discordbot = self
        self.servidores = dict()
        self.canais = dict()
        for i in range(len(self.guilds)):
            self.servidores[self.guilds[i].name] = {}
            self.servidores[self.guilds[i].name]['guild']=self.guilds[i]
            servidor = self.guilds[i]
            for k in range(len(servidor.channels)):
                canal = servidor.channels[k]
                self.canais[str(canal.name)] = canal
            if 'bottalk' not in self.canais.keys():
                newchan = await self.servidores[self.guilds[i].name]['guild'].create_text_channel('bottalk')
                self.canais[str(newchan.name)] = newchan
            self.servidores[self.guilds[i].name]['canais'] = self.canais
        self.bottalk = self.get_channel(self.servidores[servername]['canais']['bottalk'].id)
        await self.msg("Bot online: " + converteHora(datetime.now(),True))
        print(f'{self.user} has connected to Discord!')
        taskervar.startprocess()
    async def msg(self, msg):
        await self.bottalk.send(msg)
    async def on_message(self, message):
        if message.author == self.user:
            return
        else:
            print(message)

class tasker:
    def __init__(self):
        global discordbot, taskervar
        print('Tasker start')
        taskervar = self
        self.waiter = threading.Event()
        self.lastupdate = datetime.now()
        self.nextupdate = datetime.now()
        self.thread = threading.Thread(target=self.requests)
    def startprocess(self):
        if not self.thread.is_alive():
            self.waiter = threading.Event()
            self.interval = 60*5
            self.thread = threading.Thread(target=self.requests)
            self.thread.start()
    def requests(self):
        while not self.waiter.is_set():
            getInfo()
            self.lastupdate = datetime.now()
            self.nextupdate = datetime.now()+timedelta(seconds=self.interval)
            self.waiter.wait(self.interval)
    def stopprocess(self):
        self.waiter.set()

class getInfo:
    def __init__(self):
        global discordbot, taskervar
        self.requests()
    async def discordmsg(self,msg):
        await discordbot.msg(msg)
    def requests(self):
        jsondata = {"TestStatus": 1}
        if  jsondata['TestStatus'] == 1:
            print('here')
            asyncio.create_task(self.discordmsg("SOMETHING WENT WRONG"))
            taskervar.stopprocess()
            return
        elif jsondata['TestStatus'] == 2:
            print('test')
            hora = converteHora(datetime.now(),True)
            asyncio.create_task(self.discordmsg(str("Everything is fine but not now: " + hora )))
            print('test2')

            
def converteHora(dateUTC, current=False):
    if current:
        response = (dateUTC.strftime("%d/%m/%Y, %H:%M:%S"))
    else:
        response = (dateutil.parser.isoparse(dateUTC)-timedelta(hours=3)).strftime("%d/%m/%Y, %H:%M:%S")
    return response

async def main():
    TOKEN = 'TOKEN GOES HERE'
    tasker()
    await asyncio.gather(
        await Helper().start(TOKEN)
    )


if __name__ == '__main__':
    asyncio.run(main())

你的主要问题是你没有让你的辅助线程访问 asyncio 事件循环。您不能只 await and/or create_task 全局对象上的协程(首先要避免使用全局对象的众多原因之一)。您可以通过以下方式修改代码以实现此目的:

class tasker: 
   def __init__(self):
      # ...
      self.loop = asyncio.get_running_loop()
    # ...

class getInfo:
    #...
    def requests(self):
        # replace the create_tasks calls with this. 
        asyncio.run_coroutine_threadsafe(self.discordmsg, taskervar.loop)

这使用了你的全局变量,因为我不想重写你的整个程序,但我仍然强烈建议避免使用它们并考虑自己重写。

综上所述,我怀疑您仍然会遇到此错误:

If I dont run it on a different thread, however, it does work on the TestStatus: 1 but it makes Helper get stuck and stop running with TestStatus: 2

我不知道是什么导致了这个问题,我 运行 在我的机器上重现这个问题时遇到了麻烦。您的代码很难阅读,并且缺少一些可重现性的细节。我想这是你一开始没有得到答案的部分原因。我相信您知道这篇文章,但可能值得重新访问以获取共享代码的更好实践。 https://whosebug.com/help/minimal-reproducible-example