我如何避免将 "global" 与 asyncio (telethon) 一起使用?
How I can avoid using "global" with asyncio (telethon)?
据我所知,在 python 中使用“全局”是一种不好的做法,但我不知道如何在这里避免它。
提前谢谢你。
import asyncio
from telethon import TelegramClient, sync, events
api_id = ***
api_hash = '***'
client = TelegramClient('session_name', api_id, api_hash).start()
channel_id = ***
@client.on(events.NewMessage(chats=channel_id))
async def handler(event):
text = event.raw_text.lower()
global task
if text == 'start':
task = client.loop.create_task(main())
await client.send_message(channel_id, 'task was started')
if text == 'stop':
task.cancel()
await client.send_message(channel_id, 'task was canceled')
async def main():
while True:
await client.send_message(channel_id, 'do something')
await asyncio.sleep(3)
client.run_until_disconnected()
查看文档,我找不到将客户端与 类 一起使用的推荐方法,但您应该始终能够自己连接它。例如:
class MyClient:
def __init__(self, client):
self.task = None
self.client = client
async def on_message(self, channel_id, event):
text = event.raw_text.lower()
if text == 'start':
self.task = self.client.loop.create_task(self.main(channel_id))
await self.client.send_message(channel_id, 'task was started')
if text == 'stop':
self.task.cancel()
await self.client.send_message(channel_id, 'task was canceled')
async def main(self, channel_id):
while True:
await self.client.send_message(channel_id, 'do something')
await asyncio.sleep(3)
def run_client():
# instantiate telegram client, my client, and connect the two
api_id = ***
api_hash = '***'
channel_id = ***
client = TelegramClient('session_name', api_id, api_hash).start()
my_client = MyClient(client)
@client.on(events.NewMessage(chats=channel_id))
async def handle(event):
await my_client.on_message(channel_id, event)
client.run_until_disconnected()
run_client()
据我所知,在 python 中使用“全局”是一种不好的做法,但我不知道如何在这里避免它。 提前谢谢你。
import asyncio
from telethon import TelegramClient, sync, events
api_id = ***
api_hash = '***'
client = TelegramClient('session_name', api_id, api_hash).start()
channel_id = ***
@client.on(events.NewMessage(chats=channel_id))
async def handler(event):
text = event.raw_text.lower()
global task
if text == 'start':
task = client.loop.create_task(main())
await client.send_message(channel_id, 'task was started')
if text == 'stop':
task.cancel()
await client.send_message(channel_id, 'task was canceled')
async def main():
while True:
await client.send_message(channel_id, 'do something')
await asyncio.sleep(3)
client.run_until_disconnected()
查看文档,我找不到将客户端与 类 一起使用的推荐方法,但您应该始终能够自己连接它。例如:
class MyClient:
def __init__(self, client):
self.task = None
self.client = client
async def on_message(self, channel_id, event):
text = event.raw_text.lower()
if text == 'start':
self.task = self.client.loop.create_task(self.main(channel_id))
await self.client.send_message(channel_id, 'task was started')
if text == 'stop':
self.task.cancel()
await self.client.send_message(channel_id, 'task was canceled')
async def main(self, channel_id):
while True:
await self.client.send_message(channel_id, 'do something')
await asyncio.sleep(3)
def run_client():
# instantiate telegram client, my client, and connect the two
api_id = ***
api_hash = '***'
channel_id = ***
client = TelegramClient('session_name', api_id, api_hash).start()
my_client = MyClient(client)
@client.on(events.NewMessage(chats=channel_id))
async def handle(event):
await my_client.on_message(channel_id, event)
client.run_until_disconnected()
run_client()