运行 在 Tweepy 中使用位置过滤器流式传输 Twitter 的时间估算

Running Time Estimate for Stream Twitter with Location Filter in Tweepy

问题已解决,请参阅 POST

末尾的解决方案

我需要帮助来估计我的 tweepy 程序使用位置过滤器调用 Twitter Stream API 的 运行ning 时间。

我启动后,它有 运行 超过 20 分钟,比我预期的要长。我是 Twitter Stream API 的新手,只使用过 REST API 几天。在我看来,REST API 会在几秒钟内给我 50 条推文,很容易。但是这个 Stream 请求花费了更多的时间。我的程序没有死机或出现任何错误。所以不知道有没有问题。如果是这样,请指出。

总而言之,如果您认为我的代码是正确的,您能否提供一个 运行ning 时间的估计值?如果您认为我的代码有误,可以帮我修改一下吗?

提前致谢!

代码如下:

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

box = [-86.33,41.63,-86.20,41.74]

class CustomStreamListener(tweepy.StreamListener):
    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream

stream = tweepy.streaming.Stream(auth, CustomStreamListener()).filter(locations=box).items(50)
stream

我尝试了 http://docs.tweepy.org/en/v3.4.0/auth_tutorial.html#auth-tutorial 中的方法,显然它对我不起作用...下面是我的代码。你介意提供任何意见吗?如果您有一些工作代码,请告诉我。谢谢!

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text)
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
myStream.filter(track=['python'], locations=(box), async=True)

错误信息如下:

Traceback (most recent call last):
  File "test.py", line 26, in <module>
    myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
TypeError: 'MyStreamListener' object is not callable

问题解决了!请参阅下面的解决方案

经过另一轮调试,这里是可能对同一主题感兴趣的人的解决方案:

# Import Tweepy, sys, sleep, credentials.py
try:
    import json
except ImportError:
    import simplejson as json
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text.encode('utf-8'))
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(api.auth, listener=myStreamListener)
myStream.filter(track=['NYC'], locations=(box), async=True)

核心问题: 我认为您误解了 Stream 的含义。

Tl;dr: 您的代码正在运行,只是您没有对返回的数据执行任何操作。

其余 API 呼叫是一次信息呼叫。你发出一个请求,Twitter 发回一些信息,这些信息被分配给你的变量。

来自 Tweepy 的 StreamObject(您创建为 stream)使用您的搜索参数打开到 Twitter 的连接,然后 Twitter 将推文流式传输到它。永远。

来自 Tweepy 文档:

The streaming api is quite different from the REST api because the REST api is used to pull data from twitter but the streaming api pushes messages to a persistent session. This allows the streaming api to download more data in real time than could be done using the REST API.

因此,您需要构建一个处理程序(streamListener,用 tweepy 的术语来说),例如 this one that prints out the tweets.

额外

警告的话,来自痛苦的经历 - 如果您要尝试将推文保存到数据库:Twitter 可以并且将会以比您将它们保存到数据库更快的速度将对象流式传输给您。这将导致您的流断开连接,因为推文在 Twitter 上备份,并且超过一定程度的备份(不是实际的短语),它们只会断开您的连接。

我通过使用 django-rq 将保存作业放入作业队列来处理这个问题 - 这样,我每秒可以处理数百条推文(在高峰期),并且它会顺利进行。您可以在下面看到我是如何做到的。 Python-rq 如果你不使用 django 作为它的框架也可以工作。 read both 方法只是一个从推文中读取并将其保存到 postgres 数据库的函数。在我的具体案例中,我使用 django_rq.enqueue 函数通过 Django ORM 做到了这一点。

__author__ = 'iamwithnail'

from django.core.management.base import BaseCommand, CommandError
from django.db.utils import DataError
from harvester.tools import read_both
import django_rq

class Command(BaseCommand):

    args = '<search_string search_string>'
    help = "Opens a listener to the Twitter stream, and tracks the given string or list" \
           "of strings, saving them down to the DB as they are received."


    def handle(self, *args, **options):
        try:
            import urllib3.contrib.pyopenssl
            urllib3.contrib.pyopenssl.inject_into_urllib3()
        except ImportError:
            pass

        consumer_key = '***'
        consumer_secret = '****'
        access_token='****'
        access_token_secret_var='****'
        import tweepy
        import json

        # This is the listener, responsible for receiving data
        class StdOutListener(tweepy.StreamListener):
            def on_data(self, data):
                decoded = json.loads(data)
                try:
                    if decoded['lang'] == 'en':
                        django_rq.enqueue(read_both, decoded)
                    else:
                        pass
                except KeyError,e:
                    print "Error on Key", e
                except DataError, e:
                    print "DataError", e
                return True


            def on_error(self, status):
                print status


        l = StdOutListener()
        auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
        auth.set_access_token(access_token, access_token_secret_var)
        stream = tweepy.Stream(auth, l)
stream.filter(track=args)

编辑:你后面的问题是调用监听器错误导致的。

myStreamListener = MyStreamListener() #creates an instance of your class

你在哪里有这个:

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())

您在使用 () 时试图将侦听器作为函数调用。所以应该是:

myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)

事实上,可以更简洁地写成:

myStream = tweepy.Stream(api.auth,myStreamListener)