如何使用 Tweepy 通过带有语言的主题标签和计数过滤器来流式传输推文?

How to stream Tweets by hashtag with language AND count filter using Tweepy?

所以我想做的是直播来自 Twitter 的推文 API:仅针对主题标签 'Brexit',仅使用英语,并且针对特定数量的推文(1k - 2k ).

到目前为止,我的代码将实时流式传输推文,但无论我以何种方式修改它,我最终要么忽略计数并无限期地流式传输,要么出现错误。如果我将其更改为仅流式传输特定用户的推文,则计数功能有效,但它会忽略主题标签。如果我流式传输给定主题标签的所有内容,它会完全忽略计数。我在尝试修复它方面做得不错,但我经验不足,而且真的碰壁了。

如果我能得到一些关于如何同时勾选所有这些框的帮助,我将不胜感激! 到目前为止,下面的代码只会无限期地传输 'Brexit' 条推文,因此会忽略 count=10

下面的代码由于我在玩弄,有点乱,请见谅:

import numpy as np
import pandas as pd
import tweepy
from tweepy import API
from tweepy import Cursor
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import Twitter_Credentials
import matplotlib.pyplot as plt

# Twitter client - hash out to stream all


class TwitterClient:
    def __init__(self, twitter_user=None):
        self.auth = TwitterAuthenticator().authenticate_twitter_app()
        self.twitter_client = API(self.auth)

        self.twitter_user = twitter_user

    def get_twitter_client_api(self):
        return self.twitter_client

# Twitter authenticator


class TwitterAuthenticator:
    def authenticate_twitter_app(self):
        auth = OAuthHandler(Twitter_Credentials.consumer_key, Twitter_Credentials.consumer_secret)
        auth.set_access_token(Twitter_Credentials.access_token, Twitter_Credentials.access_secret)
        return auth

class TwitterStreamer():
    # Class for streaming and processing live Tweets
    def __init__(self):
        self.twitter_authenticator = TwitterAuthenticator()

    def stream_tweets(self, fetched_tweets_filename, hash_tag_list):

        # this handles Twitter authentication and connection to Twitter API
        listener = TwitterListener(fetched_tweets_filename)
        auth = self.twitter_authenticator.authenticate_twitter_app()
        stream = Stream(auth, listener)
        # This line filters Twitter stream to capture data by keywords
        stream.filter(track=hash_tag_list)

# Twitter stream listener

class TwitterListener(StreamListener):
    # This is a listener class that prints incoming Tweets to stdout
    def __init__(self, fetched_tweets_filename):
        self.fetched_tweets_filename = fetched_tweets_filename

    def on_data(self, data):
        try:
            print(data)
            with open(self.fetched_tweets_filename, 'a') as tf:
                tf.write(data)
            return True
        except BaseException as e:
            print("Error on_data: %s" % str(e))
        return True

    def on_error(self, status):
        if status == 420:
            # Return false on data in case rate limit occurs
            return False
        print(status)

class TweetAnalyzer():
    # Functionality for analysing and categorising content from tweets

    def tweets_to_data_frame(self, tweets):
        df = pd.DataFrame(data=[tweet.text for tweet in tweets], columns=['tweets'])

        df['id'] = np.array([tweet.id for tweet in tweets])
        df['len'] = np.array([len(tweet.text) for tweet in tweets])
        df['date'] = np.array([tweet.created_at for tweet in tweets])
        df['source'] = np.array([tweet.source for tweet in tweets])
        df['likes'] = np.array([tweet.favorite_count for tweet in tweets])
        df['retweets'] = np.array([tweet.retweet_count for tweet in tweets])

        return df


if __name__ == "__main__":

    auth = OAuthHandler(Twitter_Credentials.consumer_key, Twitter_Credentials.consumer_secret)
    auth.set_access_token(Twitter_Credentials.access_token, Twitter_Credentials.access_secret)
    api = tweepy.API(auth)

    for tweet in Cursor(api.search, q="#brexit", count=10,
                               lang="en",
                               since="2019-04-03").items():
        fetched_tweets_filename = "tweets.json"
        twitter_streamer = TwitterStreamer()
        hash_tag_list = ["Brexit"]
        twitter_streamer.stream_tweets(fetched_tweets_filename, hash_tag_list)

您正在尝试使用两种不同的方法访问 Twitter API - 流式传输是实时的,而搜索是一次性的 API 调用。

由于流式传输是连续且实时的,因此无法对其应用结果计数 - 代码只是打开一个连接,说 "hey, send me all the Tweets from now onwards that contain the hash_tag_list",然后坐下来收听。然后你进入 StreamListener,对于收到的每条推文,你将它们写入一个文件。

可以在这里应用一个计数器,但你需要将它包装在你的 StreamListener on_data 处理程序中,并为每个增加计数器收到推文。当你的推文达到 1000 条时,停止收听。

对于搜索选项,您有几个问题...第一个是您要搜索 2019 年以来的推文,但标准搜索 API 只能返回 7 天前的时间.你显然在那里只要求了 10 条推文。不过,按照您编写方法的方式,实际发生的情况是,对于 API returns 的 10 条集合中的每条推文,您 然后 创建一个实时流连接并开始收听和写入文件。所以那是行不通的。

您需要选择一个 - 要么搜索 1000 条推文并将它们写入文件(永远不要设置 TwitterStreamer()),要么侦听 1000 条推文并将它们写入文件(删除for Tweet in Cursor(api.search... 直接跳到主播。

只需将主题标签符号添加到列表中的搜索词组,它就会匹配使用特定主题标签的推文。它区分大小写,因此您可能希望向搜索数组添加尽可能多的选项。仅使用“Brexit”匹配可能使用或不使用主题标签但包含关键字“Brexit”的推文。

hash_tag_list = ["#Brexit"]