如何捕获电视节目协程中的错误?
How to capture errors on telethon event coroutines?
我使用以下脚本来侦听来自 public 电报频道和群组的新消息。
import configparser
from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events, sync
from telethon.tl.functions.messages import (GetHistoryRequest)
from telethon.tl.types import (
PeerChannel
)
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxxxx'
#target channels that you want to listen to:
input_channels = ('https://t.me/xxxxxx','https://t.me/xxxx','https://t.me/xxxxxxx')
#create a client
client = TelegramClient('anon', api_id, api_hash)
# Listen to messages from target channels
@client.on(events.NewMessage(chats=input_channels))
async def newMessageListener(event):
# Get message text
newMessage = event.message.message
print(newMessage)
with client:
client.run_until_disconnected()
当频道关闭时,我收到以下错误:ValueError:没有用户将“closed_channel_name”作为用户名,我停止接收任何数据。
有没有办法识别无效频道?
到目前为止,我发现了以下可以识别有效频道的方法,但可能还有更好的方法:
client.start()
result = client.get_entity('https://t.me/xxxxxx')
以下用于捕获电视节目中的错误(在协程内)的工作
import asyncio
from telethon import TelegramClient, events
session_name = 'anon'
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxx'
chat_list = ('https://t.me/xxxxxxxx', 'https://t.me/xxxxxxxxxx')
async def main():
async with TelegramClient(session_name, api_id, api_hash) as client:
@client.on(events.NewMessage(chats=chat_list))
async def handler(event):
print(event.message.message)
await client.run_until_disconnected()
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
exception = context.get('exception')
if isinstance(exception, ValueError):
print(context['exception'])
loop.stop()
loop = asyncio.get_event_loop()
# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(main())
loop.run_forever()
我使用以下脚本来侦听来自 public 电报频道和群组的新消息。
import configparser
from telethon.errors import SessionPasswordNeededError
from telethon import TelegramClient, events, sync
from telethon.tl.functions.messages import (GetHistoryRequest)
from telethon.tl.types import (
PeerChannel
)
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxxxx'
#target channels that you want to listen to:
input_channels = ('https://t.me/xxxxxx','https://t.me/xxxx','https://t.me/xxxxxxx')
#create a client
client = TelegramClient('anon', api_id, api_hash)
# Listen to messages from target channels
@client.on(events.NewMessage(chats=input_channels))
async def newMessageListener(event):
# Get message text
newMessage = event.message.message
print(newMessage)
with client:
client.run_until_disconnected()
当频道关闭时,我收到以下错误:ValueError:没有用户将“closed_channel_name”作为用户名,我停止接收任何数据。
有没有办法识别无效频道?
到目前为止,我发现了以下可以识别有效频道的方法,但可能还有更好的方法:
client.start()
result = client.get_entity('https://t.me/xxxxxx')
以下用于捕获电视节目中的错误(在协程内)的工作
import asyncio
from telethon import TelegramClient, events
session_name = 'anon'
api_id = 'xxxxxx'
api_hash = 'xxxxxxxxxxxxxxxxxxxxx'
chat_list = ('https://t.me/xxxxxxxx', 'https://t.me/xxxxxxxxxx')
async def main():
async with TelegramClient(session_name, api_id, api_hash) as client:
@client.on(events.NewMessage(chats=chat_list))
async def handler(event):
print(event.message.message)
await client.run_until_disconnected()
def custom_exception_handler(loop, context):
# first, handle with default handler
loop.default_exception_handler(context)
exception = context.get('exception')
if isinstance(exception, ValueError):
print(context['exception'])
loop.stop()
loop = asyncio.get_event_loop()
# Set custom handler
loop.set_exception_handler(custom_exception_handler)
loop.create_task(main())
loop.run_forever()