如何从 StreamListener 处理程序(twitter)向电报发送消息? Tweepy & 电视节目

How to send messages to telegram from the StreamListener handler (twitter)? tweepy & telethon

启动脚本后我设法只发送了 1 条消息,之后它挂起并且不再接收来自 Twitter 的消息 如果我删除包裹在“----------------------------”中的代码块,那么我将收到所有推文,但是当我尝试将它发送到 Telegram 时,它在第一次后停止

最初没有单独的线程,因为我无法达到结果 将所有内容包装在单独的线程中,但结果是一样的

我做错了什么?

from telethon import TelegramClient, events, sync
from telethon.tl.types import InputChannel
import tweepy
import yaml
import sys
import coloredlogs, logging
import asyncio
import threading
import concurrent.futures
import time

start_twitter = threading.Event()

forwardinput_channel_entities = []
forwardoutput_channels = {}

class MyStreamListener(tweepy.StreamListener):
    def on_status(self, status):
        user_id = status.user.id

        if user_id in forwardoutput_channels:
            for output_channel in forwardoutput_channels[user_id]:

                message = status.text
                logging.info('-------------')
                logging.info(message)

                # ------------------------------
                try:
                    loop = asyncio.get_event_loop()
                except Exception as e:
                    loop = asyncio.new_event_loop()
                    asyncio.set_event_loop(loop)
                    logging.error(e)
                    pass

                loop.run_until_complete(telegram_client.send_message(
                        output_channel['channel'], message))
                # ------------------------------

def twitter_thred():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    auth = tweepy.OAuthHandler(config['twitter_consumer_api'],
                               config['twitter_consumer_secret'])

    auth.set_access_token(config['twitter_user_api'],
                          config['twitter_user_secret'])

    global twitter_api
    twitter_api = tweepy.API(auth)

    myStreamListener = MyStreamListener()
    myStream = tweepy.Stream(auth=twitter_api.auth, listener=myStreamListener)

    start_twitter.wait()
    myStream.filter(follow=forwardinput_channel_entities,
                    is_async=True)

def telegram_thred():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    global telegram_client
    telegram_client = TelegramClient(config['session_name'],
                                     config['api_id'],
                                     config['api_hash'])
    telegram_client.start()

    for forwardto in config['forwardto_list_ids']:

        for twitter_user_id in forwardto['from']:
            forwardinput_channel_entities.append(str(twitter_user_id))

            channels = []

            for channel in telegram_client.iter_dialogs():
                if channel.entity.id in forwardto['to']:
                    channels.append({
                        'channel': InputChannel(
                            channel.entity.id, channel.entity.access_hash),
                    })

            forwardoutput_channels[twitter_user_id] = channels

    start_twitter.set()

    telegram_client.run_until_disconnected()

def start():

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        future = executor.submit(telegram_thred)
        future = executor.submit(twitter_thred)

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print(f'Usage: {sys.argv[0]} {{CONFIG_PATH}}')
        sys.exit(1)
    with open(sys.argv[1], 'rb') as f:
        global config
        config = yaml.safe_load(f)

    coloredlogs.install(
        fmt='%(asctime)s.%(msecs)03d %(message)s',
        datefmt='%H:%M:%S')
        
    start()

运行 脚本的 yml 配置示例:

# telegram
api_id: *****************
api_hash: '*****************'
session_name: 'test'

# twitter
twitter_consumer_api: '*****************'
twitter_consumer_secret: '*****************'
twitter_user_api: '*****************'
twitter_user_secret: '*****************'

forwardto_list_ids:
  - from:
      - 0000000000    # account twitter id
    to:
      - 0000000000    # telegram channel id

,Tweepy 尚不支持异步流式传输,因此当您 运行 流式传输时它会阻塞事件循环。 is_async 使用线程方法。

现在,您应该考虑使用 Tweepy 的异步流分支/https://github.com/tweepy/tweepy/pull/1491