使用 firehose 将 Twitter 数据流式传输到 S3 存储桶

Streaming twitter data to an S3 bucket using firehose

我正在尝试将数据从 Twitter 流式传输到 aws 存储桶。好消息是我可以将数据流式传输到我的存储桶,但数据以大约 20 kb 的块形式出现(我认为这可能是由于某些 firehose 设置所致)并且即使在我指定它之后它也不会保存为 JSON在我的 python 代码中使用 JSON.LOAD。我的 S3 存储桶中的数据没有保存为 JSON,而是没有文件扩展名,而是一长串字母数字字符。我认为这可能与 client.put_record()

中使用的参数有关

非常感谢任何帮助!

请在下面找到我的代码,我从 github here.


from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json
import boto3
import time


#Variables that contains the user credentials to access Twitter API
consumer_key = "MY_CONSUMER_KEY"
consumer_secret = "MY_CONSUMER_SECRET"
access_token = "MY_ACCESS_TOKEN"
access_token_secret = "MY_SECRET_ACCESS_TOKEN"


#This is a basic listener that just prints received tweets to stdout.
class StdOutListener(StreamListener):

    def on_data(self, data):
        tweet = json.loads(data)
        try:
            if 'extended_tweet' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['extended_tweet']['full_text'],
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       '\n'
                       ]
                message = '\t'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
            elif 'text' in tweet.keys():
                #print (tweet['text'])
                message_lst = [str(tweet['id']),
                       str(tweet['user']['name']),
                       str(tweet['user']['screen_name']),
                       tweet['text'].replace('\n',' ').replace('\r',' '),
                       str(tweet['user']['followers_count']),
                       str(tweet['user']['location']),
                       str(tweet['geo']),
                       str(tweet['created_at']),
                       '\n'
                       ]
                message = '\t'.join(message_lst)
                print(message)
                client.put_record(
                    DeliveryStreamName=delivery_stream,
                    Record={
                    'Data': message
                    }
                )
        except (AttributeError, Exception) as e:
                print (e)
        return True

    def on_error(self, status):
        print (status)
        
        
        
        
        
if __name__ == '__main__':

    #This handles Twitter authetification and the connection to Twitter Streaming API
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    #tweets = Table('tweets_ft',connection=conn)
    client = boto3.client('firehose', 
                          region_name='us-east-1',
                          aws_access_key_id='MY ACCESS KEY',
                          aws_secret_access_key='MY SECRET KEY' 
                          )

    delivery_stream = 'my_firehose'
    #This line filter Twitter Streams to capture data by the keywords: 'python', 'javascript', 'ruby'
    #stream.filter(track=['trump'], stall_warnings=True)
    while True:
        try:
            print('Twitter streaming...')
            stream = Stream(auth, listener)
            stream.filter(track=['brexit'], languages=['en'], stall_warnings=True)
        except Exception as e:
            print(e)
            print('Disconnected...')
            time.sleep(5)
            continue   

您可能为 firehose 启用了 S3 压缩。如果您想在存储桶中存储原始 json 数据,请确保禁用压缩:

您还可以对 firehose 应用一些转换,将 or otherwise transform 您的 json 消息编码成其他格式。

所以看起来文件是 JSON 格式的,我只需要用 firefox 打开 S3 中的文件,我就可以看到文件的内容。文件大小的问题是由于 firehose 缓冲区设置,我将它们设置为最低,这就是文件以如此小的块发送的原因