跟踪电报频道的新订阅者数量?
Track number of new subscribers of a Telegram channel?
我在 python 中使用 telethon,我想尝试跟踪随着时间的推移加入频道的新成员的数量。
我的第一次尝试是使用 telethon.tl.functions.channels.GetFullChannelRequest
函数:
async def save_subs(channel_id):
client = TelegramClient('anon', TELEGRAM_API_ID, TELEGRAM_API_HASH)
await client.start()
async with client:
previous_count = None
number_new_members = 0
while True:
res = await client(
GetFullChannelRequest(
channel=channel_id
)
)
channel_info = res.to_dict()
count = channel_info['full_chat']['participants_count']
print(f'current count: {count}')
if previous_count is not None and count > previous_count:
number_new_members += count - previous_count
print(f'new subs: {number_new_members}')
previous_count = count
time.sleep(1)
def start(channel_id):
asyncio.run(save_subs())
不过好像每1秒就太多了,每4个请求就有几秒的延迟。也许这是速率限制?
然后我尝试使用 events.ChatAction
事件,因为文档说新用户加入频道应该触发,但没有任何东西被触发:
def check_new_user(channel_id):
telegram_client = TelegramClient('anon', TELEGRAM_API_ID, TELEGRAM_API_HASH)
@telegram_client.on(events.ChatAction)
async def my_event_handler(event):
# this never gets printed
print('event caught')
telegram_client.start()
telegram_client.run_until_disconnected()
def start(channel_id):
check_events(channel_id)
我建议采用更直接的方法。您可以使用 Telegram 的 t.me
应用程序。
DEFAULT_TIMEOUT = 10
def scrap_telegram_members_count(slug: str) -> int:
def parse_int(s: str) -> int:
return int(re.sub(r"[^\d]+", "", s))
r = requests.get(
f"https://t.me/{slug}",
timeout=DEFAULT_TIMEOUT,
)
r.raise_for_status()
text = r.text
m = re.search(r'<div class="tgme_page_extra">([^,]+),([^<]+)</div>', text)
if not m:
raise RuntimeError("Cannot find members count")
members_count = parse_int(m.group(1))
return members_count
根据我的经验,现在您可以通过这种方式每天发送不少于 ~1k 的请求。
许多对 Telegram 的请求都包含 count
,可通过返回列表中的 Telethon 的 .total
属性访问:
participants = await client.get_participants(channel, 1)
print(participants.total)
这将获取单个参与者(您必须获得这样做的许可),作为副产品,Telegram 会填写总数。
我在 python 中使用 telethon,我想尝试跟踪随着时间的推移加入频道的新成员的数量。
我的第一次尝试是使用 telethon.tl.functions.channels.GetFullChannelRequest
函数:
async def save_subs(channel_id):
client = TelegramClient('anon', TELEGRAM_API_ID, TELEGRAM_API_HASH)
await client.start()
async with client:
previous_count = None
number_new_members = 0
while True:
res = await client(
GetFullChannelRequest(
channel=channel_id
)
)
channel_info = res.to_dict()
count = channel_info['full_chat']['participants_count']
print(f'current count: {count}')
if previous_count is not None and count > previous_count:
number_new_members += count - previous_count
print(f'new subs: {number_new_members}')
previous_count = count
time.sleep(1)
def start(channel_id):
asyncio.run(save_subs())
不过好像每1秒就太多了,每4个请求就有几秒的延迟。也许这是速率限制?
然后我尝试使用 events.ChatAction
事件,因为文档说新用户加入频道应该触发,但没有任何东西被触发:
def check_new_user(channel_id):
telegram_client = TelegramClient('anon', TELEGRAM_API_ID, TELEGRAM_API_HASH)
@telegram_client.on(events.ChatAction)
async def my_event_handler(event):
# this never gets printed
print('event caught')
telegram_client.start()
telegram_client.run_until_disconnected()
def start(channel_id):
check_events(channel_id)
我建议采用更直接的方法。您可以使用 Telegram 的 t.me
应用程序。
DEFAULT_TIMEOUT = 10
def scrap_telegram_members_count(slug: str) -> int:
def parse_int(s: str) -> int:
return int(re.sub(r"[^\d]+", "", s))
r = requests.get(
f"https://t.me/{slug}",
timeout=DEFAULT_TIMEOUT,
)
r.raise_for_status()
text = r.text
m = re.search(r'<div class="tgme_page_extra">([^,]+),([^<]+)</div>', text)
if not m:
raise RuntimeError("Cannot find members count")
members_count = parse_int(m.group(1))
return members_count
根据我的经验,现在您可以通过这种方式每天发送不少于 ~1k 的请求。
许多对 Telegram 的请求都包含 count
,可通过返回列表中的 Telethon 的 .total
属性访问:
participants = await client.get_participants(channel, 1)
print(participants.total)
这将获取单个参与者(您必须获得这样做的许可),作为副产品,Telegram 会填写总数。