Tweepy:流式传输 X 分钟的数据?
Tweepy: Stream data for X minutes?
我正在使用 tweepy 对 public 推文流中的关键字进行数据挖掘。这非常简单,并且已在多个地方进行了描述:
http://runnable.com/Us9rrMiTWf9bAAW3/how-to-stream-data-from-twitter-with-tweepy-for-python
http://adilmoujahid.com/posts/2014/07/twitter-analytics/
直接从第二个复制代码link:
#Import the necessary methods from tweepy library
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
#Variables that contains the user credentials to access Twitter API
access_token = "ENTER YOUR ACCESS TOKEN"
access_token_secret = "ENTER YOUR ACCESS TOKEN SECRET"
consumer_key = "ENTER YOUR API KEY"
consumer_secret = "ENTER YOUR API SECRET"
#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):
def on_data(self, data):
print data
return True
def on_error(self, status):
print status
if __name__ == '__main__':
#This handles Twitter authetification and the connection to Twitter Streaming API
l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
#This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
stream.filter(track=['python', 'javascript', 'ruby'])
我想不通的是 如何将此数据流式传输到 python 变量中? 而不是将其打印到屏幕上...我'我在 ipython 笔记本上工作,并希望在流式传输一分钟左右后 foo
在某个变量中捕获流。此外,如何让流超时?它以这种方式无限期地运行。
相关:
Using tweepy to access Twitter's Streaming API
是的,在 post 中,@Adil Moujahid 提到他的代码 运行 3 天。我改编了相同的代码并进行了初始测试,做了以下调整:
a) 添加了位置过滤器以获取有限的推文,而不是包含关键字的通用推文。
参见 How to add a location filter to tweepy module。
从这里,您可以在上面的代码中创建一个中间变量,如下所示:
stream_all = Stream(auth, l)
假设我们,select三F运行cisco地区,我们可以添加:
stream_SFO = stream_all.filter(locations=[-122.75,36.8,-121.75,37.8])
假设过滤位置的时间少于过滤关键字的时间。
(b) 然后你可以过滤关键词:
tweet_iter = stream_SFO.filter(track=['python', 'javascript', 'ruby'])
(c) 然后您可以将其写入文件,如下所示:
with open('file_name.json', 'w') as f:
json.dump(tweet_iter,f,indent=1)
这应该会花费更少的时间。我碰巧想解决您今天 post 提出的同一个问题。因此,我没有执行时间。
希望对您有所帮助。
我注意到您希望将数据流式传输到一个变量中供以后使用。我这样做的方法是创建一个方法,使用 sqlite3 和 sqlalchemy 将数据流式传输到数据库中。
例如,首先这里是正则代码:
import tweepy
import json
import time
import db_commands
import credentials
API_KEY = credentials.ApiKey
API_KEY_SECRET = credentials.ApiKeySecret
ACCESS_TOKEN = credentials.AccessToken
ACCESS_TOKEN_SECRET = credentials.AccessTokenSecret
def create_auth_instance():
"""Set up Authentication Instance"""
auth = tweepy.OAuthHandler(API_KEY, API_KEY_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit = True)
return api
class MyStreamListener(tweepy.StreamListener):
""" Listen for tweets """
def __init__(self, api=None):
self.counter = 0
# References the auth instance for the listener
self.api = create_auth_instance()
# Creates a database command instance
self.dbms = db_commands.MyDatabase(db_commands.SQLITE, dbname='mydb.sqlite')
# Creates a database table
self.dbms.create_db_tables()
def on_connect(self):
"""Notify when user connected to twitter"""
print("Connected to Twitter API!")
def on_status(self, tweet):
"""
Everytime a tweet is tracked, add the contents of the tweet,
its username, text, and date created, into a sqlite3 database
"""
user = tweet.user.screen_name
text = tweet.text
date_created = tweet.created_at
self.dbms.insert(user, text, date_created)
def on_error(self, status_code):
"""Handle error codes"""
if status_code == 420:
# Return False if stream disconnects
return False
def main():
"""Create twitter listener (Stream)"""
tracker_subject = input("Type subject to track: ")
twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)
main()
正如您在代码中看到的,我们验证并创建一个监听器,然后激活一个流
twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)
每次我们收到推文时,都会执行 'on_status' 函数,该函数可用于对正在流式传输的推文数据执行一组操作。
def on_status(self, tweet):
"""
Everytime a tweet is tracked, add the contents of the tweet,
its username, text, and date created, into a sqlite3 database
"""
user = tweet.user.screen_name
text = tweet.text
date_created = tweet.created_at
self.dbms.insert(user, text, date_created)
Tweet 数据,tweet,在三个变量 user,text,date_created 中捕获,然后引用数据库控制器在 MyStreamListener Class 的 init 函数中初始化。此 insert 函数是从导入的 db_commands 文件中调用的。
这是位于 db_commands.py 文件中的代码,使用 import db_commands 导入到代码中.
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
# Global Variables
SQLITE = 'sqlite'
# MYSQL = 'mysql'
# POSTGRESQL = 'postgresql'
# MICROSOFT_SQL_SERVER = 'mssqlserver'
# Table Names
TWEETS = 'tweets'
class MyDatabase:
# http://docs.sqlalchemy.org/en/latest/core/engines.html
DB_ENGINE = {
SQLITE: 'sqlite:///{DB}',
# MYSQL: 'mysql://scott:tiger@localhost/{DB}',
# POSTGRESQL: 'postgresql://scott:tiger@localhost/{DB}',
# MICROSOFT_SQL_SERVER: 'mssql+pymssql://scott:tiger@hostname:port/{DB}'
}
# Main DB Connection Ref Obj
db_engine = None
def __init__(self, dbtype, username='', password='', dbname=''):
dbtype = dbtype.lower()
if dbtype in self.DB_ENGINE.keys():
engine_url = self.DB_ENGINE[dbtype].format(DB=dbname)
self.db_engine = create_engine(engine_url)
print(self.db_engine)
else:
print("DBType is not found in DB_ENGINE")
def create_db_tables(self):
metadata = MetaData()
tweets = Table(TWEETS, metadata,
Column('id', Integer, primary_key=True),
Column('user', String),
Column('text', String),
Column('date_created', String),
)
try:
metadata.create_all(self.db_engine)
print("Tables created")
except Exception as e:
print("Error occurred during Table creation!")
print(e)
# Insert, Update, Delete
def execute_query(self, query=''):
if query == '' : return
print (query)
with self.db_engine.connect() as connection:
try:
connection.execute(query)
except Exception as e:
print(e)
def insert(self, user, text, date_created):
# Insert Data
query = "INSERT INTO {}(user, text, date_created)"\
"VALUES ('{}', '{}', '{}');".format(TWEETS, user, text, date_created)
self.execute_query(query)
此代码使用 sqlalchemy 包创建一个 sqlite3 数据库和 post 推文到 tweets table .使用 pip install sqlalchemy 可以轻松安装 SQLalchemy。如果你同时使用这两个代码,你应该能够通过过滤器将推文抓取到数据库中。如果这对您有帮助以及您还有其他问题,请告诉我。
我正在使用 tweepy 对 public 推文流中的关键字进行数据挖掘。这非常简单,并且已在多个地方进行了描述:
http://runnable.com/Us9rrMiTWf9bAAW3/how-to-stream-data-from-twitter-with-tweepy-for-python
http://adilmoujahid.com/posts/2014/07/twitter-analytics/
直接从第二个复制代码link:
#Import the necessary methods from tweepy library
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
#Variables that contains the user credentials to access Twitter API
access_token = "ENTER YOUR ACCESS TOKEN"
access_token_secret = "ENTER YOUR ACCESS TOKEN SECRET"
consumer_key = "ENTER YOUR API KEY"
consumer_secret = "ENTER YOUR API SECRET"
#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):
def on_data(self, data):
print data
return True
def on_error(self, status):
print status
if __name__ == '__main__':
#This handles Twitter authetification and the connection to Twitter Streaming API
l = StdOutListener()
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, l)
#This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
stream.filter(track=['python', 'javascript', 'ruby'])
我想不通的是 如何将此数据流式传输到 python 变量中? 而不是将其打印到屏幕上...我'我在 ipython 笔记本上工作,并希望在流式传输一分钟左右后 foo
在某个变量中捕获流。此外,如何让流超时?它以这种方式无限期地运行。
相关:
Using tweepy to access Twitter's Streaming API
是的,在 post 中,@Adil Moujahid 提到他的代码 运行 3 天。我改编了相同的代码并进行了初始测试,做了以下调整:
a) 添加了位置过滤器以获取有限的推文,而不是包含关键字的通用推文。 参见 How to add a location filter to tweepy module。 从这里,您可以在上面的代码中创建一个中间变量,如下所示:
stream_all = Stream(auth, l)
假设我们,select三F运行cisco地区,我们可以添加:
stream_SFO = stream_all.filter(locations=[-122.75,36.8,-121.75,37.8])
假设过滤位置的时间少于过滤关键字的时间。
(b) 然后你可以过滤关键词:
tweet_iter = stream_SFO.filter(track=['python', 'javascript', 'ruby'])
(c) 然后您可以将其写入文件,如下所示:
with open('file_name.json', 'w') as f:
json.dump(tweet_iter,f,indent=1)
这应该会花费更少的时间。我碰巧想解决您今天 post 提出的同一个问题。因此,我没有执行时间。
希望对您有所帮助。
我注意到您希望将数据流式传输到一个变量中供以后使用。我这样做的方法是创建一个方法,使用 sqlite3 和 sqlalchemy 将数据流式传输到数据库中。
例如,首先这里是正则代码:
import tweepy
import json
import time
import db_commands
import credentials
API_KEY = credentials.ApiKey
API_KEY_SECRET = credentials.ApiKeySecret
ACCESS_TOKEN = credentials.AccessToken
ACCESS_TOKEN_SECRET = credentials.AccessTokenSecret
def create_auth_instance():
"""Set up Authentication Instance"""
auth = tweepy.OAuthHandler(API_KEY, API_KEY_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth, wait_on_rate_limit = True)
return api
class MyStreamListener(tweepy.StreamListener):
""" Listen for tweets """
def __init__(self, api=None):
self.counter = 0
# References the auth instance for the listener
self.api = create_auth_instance()
# Creates a database command instance
self.dbms = db_commands.MyDatabase(db_commands.SQLITE, dbname='mydb.sqlite')
# Creates a database table
self.dbms.create_db_tables()
def on_connect(self):
"""Notify when user connected to twitter"""
print("Connected to Twitter API!")
def on_status(self, tweet):
"""
Everytime a tweet is tracked, add the contents of the tweet,
its username, text, and date created, into a sqlite3 database
"""
user = tweet.user.screen_name
text = tweet.text
date_created = tweet.created_at
self.dbms.insert(user, text, date_created)
def on_error(self, status_code):
"""Handle error codes"""
if status_code == 420:
# Return False if stream disconnects
return False
def main():
"""Create twitter listener (Stream)"""
tracker_subject = input("Type subject to track: ")
twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)
main()
正如您在代码中看到的,我们验证并创建一个监听器,然后激活一个流
twitter_listener = MyStreamListener()
myStream = tweepy.Stream(auth=twitter_listener.api.auth, listener=twitter_listener)
myStream.filter(track=[tracker_subject], is_async=True)
每次我们收到推文时,都会执行 'on_status' 函数,该函数可用于对正在流式传输的推文数据执行一组操作。
def on_status(self, tweet):
"""
Everytime a tweet is tracked, add the contents of the tweet,
its username, text, and date created, into a sqlite3 database
"""
user = tweet.user.screen_name
text = tweet.text
date_created = tweet.created_at
self.dbms.insert(user, text, date_created)
Tweet 数据,tweet,在三个变量 user,text,date_created 中捕获,然后引用数据库控制器在 MyStreamListener Class 的 init 函数中初始化。此 insert 函数是从导入的 db_commands 文件中调用的。
这是位于 db_commands.py 文件中的代码,使用 import db_commands 导入到代码中.
from sqlalchemy import create_engine
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
# Global Variables
SQLITE = 'sqlite'
# MYSQL = 'mysql'
# POSTGRESQL = 'postgresql'
# MICROSOFT_SQL_SERVER = 'mssqlserver'
# Table Names
TWEETS = 'tweets'
class MyDatabase:
# http://docs.sqlalchemy.org/en/latest/core/engines.html
DB_ENGINE = {
SQLITE: 'sqlite:///{DB}',
# MYSQL: 'mysql://scott:tiger@localhost/{DB}',
# POSTGRESQL: 'postgresql://scott:tiger@localhost/{DB}',
# MICROSOFT_SQL_SERVER: 'mssql+pymssql://scott:tiger@hostname:port/{DB}'
}
# Main DB Connection Ref Obj
db_engine = None
def __init__(self, dbtype, username='', password='', dbname=''):
dbtype = dbtype.lower()
if dbtype in self.DB_ENGINE.keys():
engine_url = self.DB_ENGINE[dbtype].format(DB=dbname)
self.db_engine = create_engine(engine_url)
print(self.db_engine)
else:
print("DBType is not found in DB_ENGINE")
def create_db_tables(self):
metadata = MetaData()
tweets = Table(TWEETS, metadata,
Column('id', Integer, primary_key=True),
Column('user', String),
Column('text', String),
Column('date_created', String),
)
try:
metadata.create_all(self.db_engine)
print("Tables created")
except Exception as e:
print("Error occurred during Table creation!")
print(e)
# Insert, Update, Delete
def execute_query(self, query=''):
if query == '' : return
print (query)
with self.db_engine.connect() as connection:
try:
connection.execute(query)
except Exception as e:
print(e)
def insert(self, user, text, date_created):
# Insert Data
query = "INSERT INTO {}(user, text, date_created)"\
"VALUES ('{}', '{}', '{}');".format(TWEETS, user, text, date_created)
self.execute_query(query)
此代码使用 sqlalchemy 包创建一个 sqlite3 数据库和 post 推文到 tweets table .使用 pip install sqlalchemy 可以轻松安装 SQLalchemy。如果你同时使用这两个代码,你应该能够通过过滤器将推文抓取到数据库中。如果这对您有帮助以及您还有其他问题,请告诉我。