Python telegram bot 的 `get_chat_members_count` & 避免洪水限制或如何使用包装器和装饰器
Python telegram bot's `get_chat_members_count` & avoiding flood limits or how to use wrappers and decorators
我正在检查大约 3000 个电报聊天的列表,以使用 get_chat_members_count
方法获取和检索每个聊天中的聊天成员数量。
在某些时候,我达到了洪水限制并被 Telegram BOT 暂时禁止。
Traceback (most recent call last):
File "C:\Users\alexa\Desktop\ico_icobench_2.py", line 194, in <module>
ico_tel_memb = bot.get_chat_members_count('@' + ico_tel_trim, timeout=60)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 60, in decorator
result = func(self, *args, **kwargs)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 2006, in get_chat_members_count
result = self._request.post(url, data, timeout=timeout)
File "C:\Python36\lib\site-packages\telegram\utils\request.py", line 278, in post
**urlopen_kwargs)
File "C:\Python36\lib\site-packages\telegram\utils\request.py", line 208, in _request_wrapper
message = self._parse(resp.data)
File "C:\Python36\lib\site-packages\telegram\utils\request.py", line 168, in _parse
raise RetryAfter(retry_after)
telegram.error.RetryAfter: Flood control exceeded. Retry in 85988 seconds
python-telegram-bot
wiki 提供了有关如何避免洪水限制的详细解释和示例 here。
但是,我正在努力实施他们的解决方案,我希望这里有人比我更了解这方面的知识。
我确实复制并粘贴了他们的示例,但毫无疑问无法正常工作,因为我是 python 的新手。我猜我遗漏了一些定义,但我不确定是哪个。这是下面的代码,之后是我收到的第一个错误。显然TOKEN需要换成你的token。
import telegram.bot
from telegram.ext import messagequeue as mq
class MQBot(telegram.bot.Bot):
'''A subclass of Bot which delegates send method handling to MQ'''
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
super(MQBot, self).__init__(*args, **kwargs)
# below 2 attributes should be provided for decorator usage
self._is_messages_queued_default = is_queued_def
self._msg_queue = mqueue or mq.MessageQueue()
def __del__(self):
try:
self._msg_queue.stop()
except:
pass
super(MQBot, self).__del__()
@mq.queuedmessage
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup`
OPTIONAL arguments'''
return super(MQBot, self).send_message(*args, **kwargs)
if __name__ == '__main__':
from telegram.ext import MessageHandler, Filters
import os
token = os.environ.get('TOKEN')
# for test purposes limit global throughput to 3 messages per 3 seconds
q = mq.MessageQueue(all_burst_limit=3, all_time_limit_ms=3000)
testbot = MQBot(token, mqueue=q)
upd = telegram.ext.updater.Updater(bot=testbot)
def reply(bot, update):
# tries to echo 10 msgs at once
chatid = update.message.chat_id
msgt = update.message.text
print(msgt, chatid)
for ix in range(10):
bot.send_message(chat_id=chatid, text='%s) %s' % (ix + 1, msgt))
hdl = MessageHandler(Filters.text, reply)
upd.dispatcher.add_handler(hdl)
upd.start_polling()
我得到的第一个错误是:
Traceback (most recent call last):
File "C:\Users\alexa\Desktop\z test.py", line 34, in <module>
testbot = MQBot(token, mqueue=q)
File "C:\Users\alexa\Desktop\z test.py", line 9, in __init__
super(MQBot, self).__init__(*args, **kwargs)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 108, in __init__
self.token = self._validate_token(token)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 129, in _validate_token
if any(x.isspace() for x in token):
TypeError: 'NoneType' object is not iterable
我遇到的第二个问题是如何将包装器和装饰器与 get_chat_members_count
一起使用。
我添加到示例中的代码是:
@mq.queuedmessage
def get_chat_members_count(self, *args, **kwargs):
return super(MQBot, self).get_chat_members_count(*args, **kwargs)
但是没有任何反应,我也不知道聊天成员的数量。我也没有说我需要计算哪个聊天,所以我什么也没得到也就不足为奇了,但是我应该把电报聊天 ID 放在哪里?
您收到此错误是因为 MQBot 收到一个空令牌。由于某种原因,它没有引发描述性异常,而是意外崩溃。
那为什么token是空的?看来您使用 os.environ.get
不正确。 os.environ
部分是 dictionary and its' method get
allows one to access dict's contents safely. According to docs:
get(key[, default])
Return the value for key if key is in the dictionary, else default. If default is not given, it defaults to None, so that this method never raises a KeyError.
根据您的问题,在这部分 token = os.environ.get('TOKEN')
您将令牌本身作为密钥传递。相反,您应该传递包含您的令牌的环境变量的名称。
你可以解决这个问题,要么像这样 token = 'TOKEN'
重写那个部分,要么通过 setting environmental variable correctly 并通过正确的名称从 os.environ.get
访问它。
我正在检查大约 3000 个电报聊天的列表,以使用 get_chat_members_count
方法获取和检索每个聊天中的聊天成员数量。
在某些时候,我达到了洪水限制并被 Telegram BOT 暂时禁止。
Traceback (most recent call last):
File "C:\Users\alexa\Desktop\ico_icobench_2.py", line 194, in <module>
ico_tel_memb = bot.get_chat_members_count('@' + ico_tel_trim, timeout=60)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 60, in decorator
result = func(self, *args, **kwargs)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 2006, in get_chat_members_count
result = self._request.post(url, data, timeout=timeout)
File "C:\Python36\lib\site-packages\telegram\utils\request.py", line 278, in post
**urlopen_kwargs)
File "C:\Python36\lib\site-packages\telegram\utils\request.py", line 208, in _request_wrapper
message = self._parse(resp.data)
File "C:\Python36\lib\site-packages\telegram\utils\request.py", line 168, in _parse
raise RetryAfter(retry_after)
telegram.error.RetryAfter: Flood control exceeded. Retry in 85988 seconds
python-telegram-bot
wiki 提供了有关如何避免洪水限制的详细解释和示例 here。
但是,我正在努力实施他们的解决方案,我希望这里有人比我更了解这方面的知识。
我确实复制并粘贴了他们的示例,但毫无疑问无法正常工作,因为我是 python 的新手。我猜我遗漏了一些定义,但我不确定是哪个。这是下面的代码,之后是我收到的第一个错误。显然TOKEN需要换成你的token。
import telegram.bot
from telegram.ext import messagequeue as mq
class MQBot(telegram.bot.Bot):
'''A subclass of Bot which delegates send method handling to MQ'''
def __init__(self, *args, is_queued_def=True, mqueue=None, **kwargs):
super(MQBot, self).__init__(*args, **kwargs)
# below 2 attributes should be provided for decorator usage
self._is_messages_queued_default = is_queued_def
self._msg_queue = mqueue or mq.MessageQueue()
def __del__(self):
try:
self._msg_queue.stop()
except:
pass
super(MQBot, self).__del__()
@mq.queuedmessage
def send_message(self, *args, **kwargs):
'''Wrapped method would accept new `queued` and `isgroup`
OPTIONAL arguments'''
return super(MQBot, self).send_message(*args, **kwargs)
if __name__ == '__main__':
from telegram.ext import MessageHandler, Filters
import os
token = os.environ.get('TOKEN')
# for test purposes limit global throughput to 3 messages per 3 seconds
q = mq.MessageQueue(all_burst_limit=3, all_time_limit_ms=3000)
testbot = MQBot(token, mqueue=q)
upd = telegram.ext.updater.Updater(bot=testbot)
def reply(bot, update):
# tries to echo 10 msgs at once
chatid = update.message.chat_id
msgt = update.message.text
print(msgt, chatid)
for ix in range(10):
bot.send_message(chat_id=chatid, text='%s) %s' % (ix + 1, msgt))
hdl = MessageHandler(Filters.text, reply)
upd.dispatcher.add_handler(hdl)
upd.start_polling()
我得到的第一个错误是:
Traceback (most recent call last):
File "C:\Users\alexa\Desktop\z test.py", line 34, in <module>
testbot = MQBot(token, mqueue=q)
File "C:\Users\alexa\Desktop\z test.py", line 9, in __init__
super(MQBot, self).__init__(*args, **kwargs)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 108, in __init__
self.token = self._validate_token(token)
File "C:\Python36\lib\site-packages\telegram\bot.py", line 129, in _validate_token
if any(x.isspace() for x in token):
TypeError: 'NoneType' object is not iterable
我遇到的第二个问题是如何将包装器和装饰器与 get_chat_members_count
一起使用。
我添加到示例中的代码是:
@mq.queuedmessage
def get_chat_members_count(self, *args, **kwargs):
return super(MQBot, self).get_chat_members_count(*args, **kwargs)
但是没有任何反应,我也不知道聊天成员的数量。我也没有说我需要计算哪个聊天,所以我什么也没得到也就不足为奇了,但是我应该把电报聊天 ID 放在哪里?
您收到此错误是因为 MQBot 收到一个空令牌。由于某种原因,它没有引发描述性异常,而是意外崩溃。
那为什么token是空的?看来您使用 os.environ.get
不正确。 os.environ
部分是 dictionary and its' method get
allows one to access dict's contents safely. According to docs:
get(key[, default])
Return the value for key if key is in the dictionary, else default. If default is not given, it defaults to None, so that this method never raises a KeyError.
根据您的问题,在这部分 token = os.environ.get('TOKEN')
您将令牌本身作为密钥传递。相反,您应该传递包含您的令牌的环境变量的名称。
你可以解决这个问题,要么像这样 token = 'TOKEN'
重写那个部分,要么通过 setting environmental variable correctly 并通过正确的名称从 os.environ.get
访问它。