如何将搜索关键字附加到推特 json 数据?
How to append search keyword to twitter json data?
我正在通过kafka 处理twitter 流数据。我设法流式传输数据并使用推特 json。但是现在我如何创建一个包含推特数据和搜索关键字的 pyspark 数据框?
下面是我写kafka producer的方法
我设法从 Twitter 对象创建了我想要的数据的数据框。但是我不知道如何获取搜索关键字。
class StdOutListener(StreamListener):
def __init__(self, producer):
self.producer_obj = producer
#on_status is activated whenever a tweet has been heard
def on_data(self, data):
try:
self.producer_obj.send("twitterstreamingdata", data.encode('utf-8'))
print(data)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
# When an error occurs
def on_error(self, status):
print (status)
return True
# When reach the rate limit
def on_limit(self, track):
# Print rate limiting error
print("Rate limited, continuing")
# Continue mining tweets
return True
# When timed out
def on_timeout(self):
# Print timeout message
print(sys.stderr, 'Timeout...')
# Wait 10 seconds
time.sleep(120)
return True # To continue listening
def on_disconnect(self, notice):
#Called when twitter sends a disconnect notice
return
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("Kafka Producer Application") \
.getOrCreate()
#This is the initialization of Kafka producer
producer = KafkaProducer(bootstrap_servers='xx.xxx.xxx.xxx:9092')
#This handles twitter auth and the conn to twitter streaming API
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, StdOutListener(producer))
print("Kafka Producer Application: ")
WORDS = input("Enter any words: ")
print ("Is this what you just said?", WORDS)
word = [u for u in WORDS.split(',')]
#This line filter twitter stream to capture data by keywords
stream.filter(track=word)
解决问题的一种方法是更改 StdOutListener class 构造函数以接收 "keyword" 参数并在 "on_data" 函数中将 "keyword" 添加到 JSON 以发送到卡夫卡
import json
import sys
import time
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from tweepy import StreamListener, Stream, OAuthHandler
class StdOutListener(StreamListener):
def __init__(self, producer: KafkaProducer = None, keyword=None):
super(StreamListener, self).__init__()
self.producer = producer
self.keyword = keyword
# on_status is activated whenever a tweet has been heard
def on_data(self, data):
try:
data = json.loads(data)
data['keyword'] = self.keyword
data = json.dumps(data)
self.producer.send("twitterstreamingdata", data.encode('utf-8'))
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
# When an error occurs
def on_error(self, status):
print(status)
return True
# When reach the rate limit
def on_limit(self, track):
# Print rate limiting error
print("Rate limited, continuing")
# Continue mining tweets
return True
# When timed out
def on_timeout(self):
# Print timeout message
print(sys.stderr, 'Timeout...')
# Wait 10 seconds
time.sleep(120)
return True # To continue listening
def on_disconnect(self, notice):
# Called when twitter sends a disconnect notice
return
if __name__ == '__main__':
CONSUMER_KEY = 'YOUR CONSUMER KEY'
CONSUMER_SECRET = 'YOUR CONSUMER SECRET'
ACCESS_TOKEN = 'YOUR ACCESS TOKEN'
ACCESS_SECRET = 'YOUR ACCESS SECRET'
print("Kafka Producer Application: ")
words = input("Enter any words: ")
print("Is this what you just said?", words)
word = [u for u in words.split(',')]
spark = SparkSession \
.builder \
.appName("Kafka Producer Application") \
.getOrCreate()
# This is the initialization of Kafka producer
kafka_producer = KafkaProducer(bootstrap_servers='35.240.157.219:9092')
# This handles twitter auth and the conn to twitter streaming API
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
stream = Stream(auth, StdOutListener(producer=kafka_producer, keyword=word))
stream.filter(track=word)
希望对您有所帮助!
我正在通过kafka 处理twitter 流数据。我设法流式传输数据并使用推特 json。但是现在我如何创建一个包含推特数据和搜索关键字的 pyspark 数据框?
下面是我写kafka producer的方法
我设法从 Twitter 对象创建了我想要的数据的数据框。但是我不知道如何获取搜索关键字。
class StdOutListener(StreamListener):
def __init__(self, producer):
self.producer_obj = producer
#on_status is activated whenever a tweet has been heard
def on_data(self, data):
try:
self.producer_obj.send("twitterstreamingdata", data.encode('utf-8'))
print(data)
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
# When an error occurs
def on_error(self, status):
print (status)
return True
# When reach the rate limit
def on_limit(self, track):
# Print rate limiting error
print("Rate limited, continuing")
# Continue mining tweets
return True
# When timed out
def on_timeout(self):
# Print timeout message
print(sys.stderr, 'Timeout...')
# Wait 10 seconds
time.sleep(120)
return True # To continue listening
def on_disconnect(self, notice):
#Called when twitter sends a disconnect notice
return
if __name__ == '__main__':
spark = SparkSession \
.builder \
.appName("Kafka Producer Application") \
.getOrCreate()
#This is the initialization of Kafka producer
producer = KafkaProducer(bootstrap_servers='xx.xxx.xxx.xxx:9092')
#This handles twitter auth and the conn to twitter streaming API
auth = OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream = Stream(auth, StdOutListener(producer))
print("Kafka Producer Application: ")
WORDS = input("Enter any words: ")
print ("Is this what you just said?", WORDS)
word = [u for u in WORDS.split(',')]
#This line filter twitter stream to capture data by keywords
stream.filter(track=word)
解决问题的一种方法是更改 StdOutListener class 构造函数以接收 "keyword" 参数并在 "on_data" 函数中将 "keyword" 添加到 JSON 以发送到卡夫卡
import json
import sys
import time
from kafka import KafkaProducer
from pyspark.sql import SparkSession
from tweepy import StreamListener, Stream, OAuthHandler
class StdOutListener(StreamListener):
def __init__(self, producer: KafkaProducer = None, keyword=None):
super(StreamListener, self).__init__()
self.producer = producer
self.keyword = keyword
# on_status is activated whenever a tweet has been heard
def on_data(self, data):
try:
data = json.loads(data)
data['keyword'] = self.keyword
data = json.dumps(data)
self.producer.send("twitterstreamingdata", data.encode('utf-8'))
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
return True
# When an error occurs
def on_error(self, status):
print(status)
return True
# When reach the rate limit
def on_limit(self, track):
# Print rate limiting error
print("Rate limited, continuing")
# Continue mining tweets
return True
# When timed out
def on_timeout(self):
# Print timeout message
print(sys.stderr, 'Timeout...')
# Wait 10 seconds
time.sleep(120)
return True # To continue listening
def on_disconnect(self, notice):
# Called when twitter sends a disconnect notice
return
if __name__ == '__main__':
CONSUMER_KEY = 'YOUR CONSUMER KEY'
CONSUMER_SECRET = 'YOUR CONSUMER SECRET'
ACCESS_TOKEN = 'YOUR ACCESS TOKEN'
ACCESS_SECRET = 'YOUR ACCESS SECRET'
print("Kafka Producer Application: ")
words = input("Enter any words: ")
print("Is this what you just said?", words)
word = [u for u in words.split(',')]
spark = SparkSession \
.builder \
.appName("Kafka Producer Application") \
.getOrCreate()
# This is the initialization of Kafka producer
kafka_producer = KafkaProducer(bootstrap_servers='35.240.157.219:9092')
# This handles twitter auth and the conn to twitter streaming API
auth = OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
stream = Stream(auth, StdOutListener(producer=kafka_producer, keyword=word))
stream.filter(track=word)
希望对您有所帮助!