Python API 流式传输,在一定大小后写入新文件
Python API Streaming, write new file after certain size
我有一个 python 脚本,它保持与 Twitter Streaming API 的开放连接,并将数据写入 json 文件。在当前写入的文件达到一定大小时,是否可以在不中断连接的情况下写入新文件?例如,我刚刚流式传输了超过 1 周的数据,但所有数据都包含在一个文件 (~2gb) 中,因此解析速度很慢。如果我可以在之后写入一个新文件,比如 500mb,那么我将有 4 个较小的文件(例如 dump1.json、dump2.json 等)来解析,而不是一个大文件。
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
# Add consumer/access tokens for Twitter API
consumer_key = '-----'
consumer_secret = '-----'
access_token = '-----'
access_secret = '-----'
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)
# Define streamlistener class to open a connection to Twitter and begin consuming data
class MyListener(StreamListener):
def on_data(self, data):
try:
with open('G:\xxxx\Raw_tweets.json', 'a') as f:
f.write(data)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
bounding_box = [-77.2157,38.2036,-76.5215,39.3365]#filtering by location
keyword_list = ['']#filtering by keyword
twitter_stream = Stream(auth, MyListener())
twitter_stream.filter(locations=bounding_box) # Filter Tweets in stream by location bounding box
#twitter_stream.filter(track=keyword_list) # Filter Tweets in stream by keyword
由于您每次都重新打开文件,这很简单 - 在文件名中使用索引并在文件大小达到阈值时推进它
class MyListener(StreamListener):
def __init(self):
self._file_index = 0
def on_data(self, data):
tweets_file = 'G:\xxxx\Raw_tweets{}.json'.format(self._file_index)
while os.path.exists(tweets_file) and os.stat(tweet_file).st_size > 2**10:
self._file_index += 1
tweets_file = 'G:\xxxx\Raw_tweets{}.json'.format(self._file_index)
....
该周期将负责重新启动您的应用程序
我有一个 python 脚本,它保持与 Twitter Streaming API 的开放连接,并将数据写入 json 文件。在当前写入的文件达到一定大小时,是否可以在不中断连接的情况下写入新文件?例如,我刚刚流式传输了超过 1 周的数据,但所有数据都包含在一个文件 (~2gb) 中,因此解析速度很慢。如果我可以在之后写入一个新文件,比如 500mb,那么我将有 4 个较小的文件(例如 dump1.json、dump2.json 等)来解析,而不是一个大文件。
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener
# Add consumer/access tokens for Twitter API
consumer_key = '-----'
consumer_secret = '-----'
access_token = '-----'
access_secret = '-----'
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_secret)
api = tweepy.API(auth)
# Define streamlistener class to open a connection to Twitter and begin consuming data
class MyListener(StreamListener):
def on_data(self, data):
try:
with open('G:\xxxx\Raw_tweets.json', 'a') as f:
f.write(data)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
def on_error(self, status):
print(status)
return True
bounding_box = [-77.2157,38.2036,-76.5215,39.3365]#filtering by location
keyword_list = ['']#filtering by keyword
twitter_stream = Stream(auth, MyListener())
twitter_stream.filter(locations=bounding_box) # Filter Tweets in stream by location bounding box
#twitter_stream.filter(track=keyword_list) # Filter Tweets in stream by keyword
由于您每次都重新打开文件,这很简单 - 在文件名中使用索引并在文件大小达到阈值时推进它
class MyListener(StreamListener):
def __init(self):
self._file_index = 0
def on_data(self, data):
tweets_file = 'G:\xxxx\Raw_tweets{}.json'.format(self._file_index)
while os.path.exists(tweets_file) and os.stat(tweet_file).st_size > 2**10:
self._file_index += 1
tweets_file = 'G:\xxxx\Raw_tweets{}.json'.format(self._file_index)
....
该周期将负责重新启动您的应用程序