如何解析多个 json 对象来自 tweepy on_data 方法流到 python 中的 postgresql?
How to parse multiple json object comes from tweepy on_data method stream to postgresql in python?
我正在尝试解析来自多个 JSON 对象的数据 return 来自 tweepy
的 on_data
方法(数据流的每一行包含一个 JSON 对象,每个数据流包含一到几行)并将其发送到 PostgreSQL DB,但它总是无法将记录插入 table 并显示 TypeError('list indices must be integers, not str)消息。
我不确定,但我认为问题出在我尝试将 data.splitlines()
和 read/process 每一行放入 PostgreSQL 的代码中。但是,当我尝试将数据打印到控制台时,它总是return多行数据,无法一条一条地处理。
我正在设置一个 python 脚本来使用 tweepy
收集推特流数据,并尝试将数据存储到 apache spark 和 PostgreSQL DB,以便我可以将其加载到 Grafana。
我还尝试了 , here, and here 中的几种解决方案。不过,好像还是有问题。
def on_data(self, data):
(width, height) = getTerminalSize()
try:
# here data comes in a multiple json object each in a line; but when i'm trying to read it line by line using splitlines() and for loop for each line it still return multiple lines so that i can't isert into the postgresql table
lines = data.splitlines()
for line in lines:
d = json.loads(line)
# processing the data
if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
# send data to socket for processing in spark
self.client_socket.send(data)
# insert record to table
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error:
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
print console_colors.GREY+ "_" * width + console_colors.ENDC
elif 'extended_tweet' not in d and 'RT @' not in d['text'].encode('utf-8'):
teks_twit = str(d['text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
# send data to socket for processing in spark
self.client_socket.send(data)
# insert record to table
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error :
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
print console_colors.GREY+ "_" * width + console_colors.ENDC
else:
pass
except BaseException as e:
print("Error on_data: %s" % str(e))
每轮 for 循环的预期结果示例:
d['user']['screen_name']
fooUser
每回合只有一个结果。
实际结果:
d['user']['screen_name']
fooUser
userNumber2
anotherUser
for 循环中的每一轮都有一个以上的结果。
更新:
我更新了代码,似乎 d['user']['screen_name']
的结果以及其他键的成功 return 单个项目,就在它被插入到 PostgreSQL table 之前。
但是,TypeError('list indices must be integers, not str) 消息仍然存在。仍在尝试其他选择。
这些是变化:
...
try:
# split each line of data
for item in data.splitlines():
# loads the data into json object
d = json.loads(item)
# processing the data
if d["user"]["lang"] == "id":
if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
...
和
...
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
# check if the data is already separated
print d['user']['screen_name']
# insert data into table
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (str(d['user']['screen_name'].encode('utf-8')), str(d['extended_tweet']['full_text'].encode('utf-8')), str(d['entities']['hashtags']['text']), len(str(d['extended_tweet']['full_text'].encode('utf-8')).split())))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error:
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
...
最后,在阅读了许多资源并进行了反复试验,并删除了 table 中包含列表的一些字段后,我尝试将代码简单地修改为:
# split each line of data
objects = data.splitlines()
for line in objects:
# loads the data into json object
d = json.loads(line)
# and then process the data...........
它终于能够将数据插入到 PostgreSQL 数据库中。 :)
我正在尝试解析来自多个 JSON 对象的数据 return 来自 tweepy
的 on_data
方法(数据流的每一行包含一个 JSON 对象,每个数据流包含一到几行)并将其发送到 PostgreSQL DB,但它总是无法将记录插入 table 并显示 TypeError('list indices must be integers, not str)消息。
我不确定,但我认为问题出在我尝试将 data.splitlines()
和 read/process 每一行放入 PostgreSQL 的代码中。但是,当我尝试将数据打印到控制台时,它总是return多行数据,无法一条一条地处理。
我正在设置一个 python 脚本来使用 tweepy
收集推特流数据,并尝试将数据存储到 apache spark 和 PostgreSQL DB,以便我可以将其加载到 Grafana。
我还尝试了
def on_data(self, data):
(width, height) = getTerminalSize()
try:
# here data comes in a multiple json object each in a line; but when i'm trying to read it line by line using splitlines() and for loop for each line it still return multiple lines so that i can't isert into the postgresql table
lines = data.splitlines()
for line in lines:
d = json.loads(line)
# processing the data
if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
# send data to socket for processing in spark
self.client_socket.send(data)
# insert record to table
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error:
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
print console_colors.GREY+ "_" * width + console_colors.ENDC
elif 'extended_tweet' not in d and 'RT @' not in d['text'].encode('utf-8'):
teks_twit = str(d['text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
# send data to socket for processing in spark
self.client_socket.send(data)
# insert record to table
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error :
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
print console_colors.GREY+ "_" * width + console_colors.ENDC
else:
pass
except BaseException as e:
print("Error on_data: %s" % str(e))
每轮 for 循环的预期结果示例:
d['user']['screen_name']
fooUser
每回合只有一个结果。
实际结果:
d['user']['screen_name']
fooUser
userNumber2
anotherUser
for 循环中的每一轮都有一个以上的结果。
更新:
我更新了代码,似乎 d['user']['screen_name']
的结果以及其他键的成功 return 单个项目,就在它被插入到 PostgreSQL table 之前。
但是,TypeError('list indices must be integers, not str) 消息仍然存在。仍在尝试其他选择。
这些是变化:
...
try:
# split each line of data
for item in data.splitlines():
# loads the data into json object
d = json.loads(item)
# processing the data
if d["user"]["lang"] == "id":
if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
...
和
...
try:
conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
cursor = conn.cursor()
# insert ops
# check if the data is already separated
print d['user']['screen_name']
# insert data into table
sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
cursor.execute(sqlquery, (str(d['user']['screen_name'].encode('utf-8')), str(d['extended_tweet']['full_text'].encode('utf-8')), str(d['entities']['hashtags']['text']), len(str(d['extended_tweet']['full_text'].encode('utf-8')).split())))
conn.commit()
count = cursor.rowcount
print (count, " Record inserted successfully into table")
except (Exception, psycopg2.Error) as error:
if(conn):
print("Failed to insert record into table", error)
finally:
#closing database connection.
if(conn):
cursor.close()
conn.close()
print(" [*] PostgreSQL connection is closed")
...
最后,在阅读了许多资源并进行了反复试验,并删除了 table 中包含列表的一些字段后,我尝试将代码简单地修改为:
# split each line of data
objects = data.splitlines()
for line in objects:
# loads the data into json object
d = json.loads(line)
# and then process the data...........
它终于能够将数据插入到 PostgreSQL 数据库中。 :)