运行 同一应用程序中的 Flask 和 Discord 机器人
Running Flask & a Discord bot in the same application
我正在 Python 构建一个 Discord 机器人,并希望接收来自 Twitch.tv 的 API 的 HTTP 请求(参见 Webhooks Guide & Webhooks Reference)(订阅事件喜欢;X streamer 已经上线)并根据从 Twitch 收到的 HTTP(POST 或 GET)请求的内容,在 Discord 机器人上做一些事情,例如:在文本频道上输出一条消息。
我正在使用 discord.py Python Discord API/Library。
我调查了此事,发现 Flask 似乎是网络服务器接收这些请求的极简主义选择。
我应该先说我是 Python 的新手,而且我以前从未使用过 Flask。
现在。问题是我似乎无法找到 运行 我的 discord 机器人中的 Flask 服务器的方法。
我已经尝试将这个简单的代码添加到我的 discord.py 脚本中:
from flask import Flask, request
app = Flask(__name__)
@app.route('/posts', methods=['POST'])
def result():
print(request.form['sched'])
# Send a message to a discord text channel etc...
return 'Received !'
当我 运行 我的 discord.py 脚本看起来像这样:
(为了缩短篇幅删除了一些命令和功能)
import discord
import asyncio
from flask import Flask, request
app = Flask(__name__)
@app.route('/posts', methods=['POST'])
def result():
print(request.form['sched'])
# Send a message to a discord text channel etc...
return 'Received !'
client = discord.Client()
@client.event
async def on_ready():
print('Logged in as')
print(client.user.name)
print(client.user.id)
print('------')
@client.event
async def on_message(message):
if message.author == client.user:
return
content = message.content
fullUser = message.author.name+'#'+message.author.discriminator
print(str(message.timestamp)+" #"+message.channel.name+" "+fullUser+": "+str(content.encode('ascii', 'ignore').decode('ascii')))
if content.startswith('!'):
content = content[1:]
if content.startswith('test'):
counter = 0
tmp = await client.send_message(message.channel, 'Calculating messages...')
async for log in client.logs_from(message.channel, limit=100):
if log.author == message.author:
counter += 1
await client.edit_message(tmp, 'You have {} messages.'.format(counter))
client.run('MyTokenHere')
似乎如果我将 flask 指向 discord.py(上面)和 运行 它,它会启动代码,到达 "client.run('MyTokenHere')" 不和谐的部分,并且就此止步,运行 discord 机器人。直到我通过执行 Ctrl+C 退出 bot,实际的 Flask 服务器才启动,但现在 discord bot 已断开连接,不再进行任何处理。
同样的问题仍然存在,例如,如果我在我的代码中的某处添加 "app.run()"(在调用 "client.run()" 启动 Discord 机器人部分之前)以启动 Flask 服务器;它只会 运行 烧瓶,卡在上面直到我 Ctrl+C 从烧瓶服务器中退出,然后它会继续启动 Discord 机器人。
最终,我需要使用 Discord API 并且我需要连接到 Discord API 网关和所有好的爵士乐来实际发送消息到机器人的频道,所以我真的不知道知道在这里做什么。
所以。我想我已经尽力解释了我最终想要在这里实现的目标,希望有人可以帮助我找到一种方法来使它与 Flask 一起工作,或者如果有更好更简单的方法,请提供不同的解决方案.
正如好心的评论者告诉我的那样; threading 似乎是正确的选择。
谢谢大家!
或者您可以使用终端多路复用器,tmux
到 运行 它们独立!。
如果您 运行 在 Linux 平台上,tmux python3 flaskapp.py
将 运行 Flask 应用程序,而您可以独立地 运行 Discord 机器人。
这是 discord.py
中的 cog 示例
我为 dbl(Discord Bot Lists)做了这个东西,你可以实现,你需要的东西。
注意:运行您的带有 webprocess 的 heroku 应用程序
示例:
网站:pythonmain.py
然后继续https://uptimerobot.com/
并设置为每 5 分钟 ping 您的网络应用
from aiohttp import web
from discord.ext import commands, tasks
import discord
import os
import aiohttp
app = web.Application()
routes = web.RouteTableDef()
def setup(bot):
bot.add_cog(Webserver(bot))
class Webserver(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.web_server.start()
@routes.get('/')
async def welcome(request):
return web.Response(text="Hello, world")
@routes.post('/dbl')
async def dblwebhook(request):
if request.headers.get('authorization') == '3mErTJMYFt':
data = await request.json()
user = self.bot.get_user(data['user']) or await self.bot.fetch_user(data['user'])
if user is None:
return
_type = f'Tested!' if data['type'] == 'test' else f'Voted!'
upvoted_bot = f'<@{data["bot"]}>'
embed = discord.Embed(title=_type, colour=discord.Color.blurple())
embed.description = f'**Upvoter :** {user.mention} Just {_type}' + f'\n**Upvoted Bot :** {upvoted_bot}'
embed.set_thumbnail(url=user.avatar_url)
channel = self.bot.get_channel(5645646545142312312)
await channel.send(embed=embed)
return 200
self.webserver_port = os.environ.get('PORT', 5000)
app.add_routes(routes)
@tasks.loop()
async def web_server(self):
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host='0.0.0.0', port=self.webserver_port)
await site.start()
@web_server.before_loop
async def web_server_before_loop(self):
await self.bot.wait_until_ready()
另一个很酷的方法,如果你不打算使用烧瓶扩展,使用夸脱而不是烧瓶,那么它对你来说会非常简单。
注意:运行你的带有 webprocess 的 heroku 应用程序
示例:
网页:pythonmain.py
然后继续https://uptimerobot.com/并设置为每 5 分钟 ping 您的网络应用程序
# Code Example
from discord.ext import commands
from quart import Quart
import os
app = Quart(__name__)
bot = commands.Bot('!')
@bot.command()
async def something(ctx):
...
"""
Note: On Heroku you cant bind your webserver with 5000 port as they aren't static.
To fix above problem you will have to get dynamic port from the environment variable and you are good to go.
"""
PORT = os.environ.get('PORT')
bot.loop.create_task(app.run_task('0.0.0.0', PORT))
bot.run('Token')
您可以将 asyncio 与 flask 一起使用来执行此操作
import discord
import asyncio
class PrintDiscordChannelsClient(discord.Client):
def __init__(self, *args, **kwargs):
self.guild_id = kwargs.pop('guild_id')
super().__init__(*args, **kwargs)
async def on_ready(self):
try:
await self.wait_until_ready()
guild = self.get_guild(int(self.guild_id))
print(f'{guild.name} is ready!')
channels = guild.text_channels
print(f'{(channels)} channels found')
await self.close()
except Exception as e:
print(e)
await self.close()
async def print_discord_channels(guild_id):
client = PrintDiscordChannelsClient(guild_id=guild_id)
await client.start('DISCORD_BOT_TOKEN')
@application.route("/discord-api", methods=["GET"])
def discord_api():
guild_id = request.args.get('guild_id')
asyncio.run(print_discord_channels(guild_id=guild_id))
return make_response("Success", 200)
我正在 Python 构建一个 Discord 机器人,并希望接收来自 Twitch.tv 的 API 的 HTTP 请求(参见 Webhooks Guide & Webhooks Reference)(订阅事件喜欢;X streamer 已经上线)并根据从 Twitch 收到的 HTTP(POST 或 GET)请求的内容,在 Discord 机器人上做一些事情,例如:在文本频道上输出一条消息。
我正在使用 discord.py Python Discord API/Library。
我调查了此事,发现 Flask 似乎是网络服务器接收这些请求的极简主义选择。
我应该先说我是 Python 的新手,而且我以前从未使用过 Flask。
现在。问题是我似乎无法找到 运行 我的 discord 机器人中的 Flask 服务器的方法。
我已经尝试将这个简单的代码添加到我的 discord.py 脚本中:
from flask import Flask, request
app = Flask(__name__)
@app.route('/posts', methods=['POST'])
def result():
print(request.form['sched'])
# Send a message to a discord text channel etc...
return 'Received !'
当我 运行 我的 discord.py 脚本看起来像这样: (为了缩短篇幅删除了一些命令和功能)
import discord
import asyncio
from flask import Flask, request
app = Flask(__name__)
@app.route('/posts', methods=['POST'])
def result():
print(request.form['sched'])
# Send a message to a discord text channel etc...
return 'Received !'
client = discord.Client()
@client.event
async def on_ready():
print('Logged in as')
print(client.user.name)
print(client.user.id)
print('------')
@client.event
async def on_message(message):
if message.author == client.user:
return
content = message.content
fullUser = message.author.name+'#'+message.author.discriminator
print(str(message.timestamp)+" #"+message.channel.name+" "+fullUser+": "+str(content.encode('ascii', 'ignore').decode('ascii')))
if content.startswith('!'):
content = content[1:]
if content.startswith('test'):
counter = 0
tmp = await client.send_message(message.channel, 'Calculating messages...')
async for log in client.logs_from(message.channel, limit=100):
if log.author == message.author:
counter += 1
await client.edit_message(tmp, 'You have {} messages.'.format(counter))
client.run('MyTokenHere')
似乎如果我将 flask 指向 discord.py(上面)和 运行 它,它会启动代码,到达 "client.run('MyTokenHere')" 不和谐的部分,并且就此止步,运行 discord 机器人。直到我通过执行 Ctrl+C 退出 bot,实际的 Flask 服务器才启动,但现在 discord bot 已断开连接,不再进行任何处理。
同样的问题仍然存在,例如,如果我在我的代码中的某处添加 "app.run()"(在调用 "client.run()" 启动 Discord 机器人部分之前)以启动 Flask 服务器;它只会 运行 烧瓶,卡在上面直到我 Ctrl+C 从烧瓶服务器中退出,然后它会继续启动 Discord 机器人。 最终,我需要使用 Discord API 并且我需要连接到 Discord API 网关和所有好的爵士乐来实际发送消息到机器人的频道,所以我真的不知道知道在这里做什么。
所以。我想我已经尽力解释了我最终想要在这里实现的目标,希望有人可以帮助我找到一种方法来使它与 Flask 一起工作,或者如果有更好更简单的方法,请提供不同的解决方案.
正如好心的评论者告诉我的那样; threading 似乎是正确的选择。 谢谢大家!
或者您可以使用终端多路复用器,tmux
到 运行 它们独立!。
如果您 运行 在 Linux 平台上,tmux python3 flaskapp.py
将 运行 Flask 应用程序,而您可以独立地 运行 Discord 机器人。
这是 discord.py
中的 cog 示例我为 dbl(Discord Bot Lists)做了这个东西,你可以实现,你需要的东西。
注意:运行您的带有 webprocess 的 heroku 应用程序
示例:
网站:pythonmain.py
然后继续https://uptimerobot.com/ 并设置为每 5 分钟 ping 您的网络应用
from aiohttp import web
from discord.ext import commands, tasks
import discord
import os
import aiohttp
app = web.Application()
routes = web.RouteTableDef()
def setup(bot):
bot.add_cog(Webserver(bot))
class Webserver(commands.Cog):
def __init__(self, bot):
self.bot = bot
self.web_server.start()
@routes.get('/')
async def welcome(request):
return web.Response(text="Hello, world")
@routes.post('/dbl')
async def dblwebhook(request):
if request.headers.get('authorization') == '3mErTJMYFt':
data = await request.json()
user = self.bot.get_user(data['user']) or await self.bot.fetch_user(data['user'])
if user is None:
return
_type = f'Tested!' if data['type'] == 'test' else f'Voted!'
upvoted_bot = f'<@{data["bot"]}>'
embed = discord.Embed(title=_type, colour=discord.Color.blurple())
embed.description = f'**Upvoter :** {user.mention} Just {_type}' + f'\n**Upvoted Bot :** {upvoted_bot}'
embed.set_thumbnail(url=user.avatar_url)
channel = self.bot.get_channel(5645646545142312312)
await channel.send(embed=embed)
return 200
self.webserver_port = os.environ.get('PORT', 5000)
app.add_routes(routes)
@tasks.loop()
async def web_server(self):
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host='0.0.0.0', port=self.webserver_port)
await site.start()
@web_server.before_loop
async def web_server_before_loop(self):
await self.bot.wait_until_ready()
另一个很酷的方法,如果你不打算使用烧瓶扩展,使用夸脱而不是烧瓶,那么它对你来说会非常简单。
注意:运行你的带有 webprocess 的 heroku 应用程序
示例: 网页:pythonmain.py
然后继续https://uptimerobot.com/并设置为每 5 分钟 ping 您的网络应用程序
# Code Example
from discord.ext import commands
from quart import Quart
import os
app = Quart(__name__)
bot = commands.Bot('!')
@bot.command()
async def something(ctx):
...
"""
Note: On Heroku you cant bind your webserver with 5000 port as they aren't static.
To fix above problem you will have to get dynamic port from the environment variable and you are good to go.
"""
PORT = os.environ.get('PORT')
bot.loop.create_task(app.run_task('0.0.0.0', PORT))
bot.run('Token')
您可以将 asyncio 与 flask 一起使用来执行此操作
import discord
import asyncio
class PrintDiscordChannelsClient(discord.Client):
def __init__(self, *args, **kwargs):
self.guild_id = kwargs.pop('guild_id')
super().__init__(*args, **kwargs)
async def on_ready(self):
try:
await self.wait_until_ready()
guild = self.get_guild(int(self.guild_id))
print(f'{guild.name} is ready!')
channels = guild.text_channels
print(f'{(channels)} channels found')
await self.close()
except Exception as e:
print(e)
await self.close()
async def print_discord_channels(guild_id):
client = PrintDiscordChannelsClient(guild_id=guild_id)
await client.start('DISCORD_BOT_TOKEN')
@application.route("/discord-api", methods=["GET"])
def discord_api():
guild_id = request.args.get('guild_id')
asyncio.run(print_discord_channels(guild_id=guild_id))
return make_response("Success", 200)