如何解析多个 json 对象来自 tweepy on_data 方法流到 python 中的 postgresql?

How to parse multiple json object comes from tweepy on_data method stream to postgresql in python?

我正在尝试解析来自多个 JSON 对象的数据 return 来自 tweepyon_data 方法(数据流的每一行包含一个 JSON 对象,每个数据流包含一到几行)并将其发送到 PostgreSQL DB,但它总是无法将记录插入 table 并显示 TypeError('list indices must be integers, not str)消息。

我不确定,但我认为问题出在我尝试将 data.splitlines() 和 read/process 每一行放入 PostgreSQL 的代码中。但是,当我尝试将数据打印到控制台时,它总是return多行数据,无法一条一条地处理。

我正在设置一个 python 脚本来使用 tweepy 收集推特流数据,并尝试将数据存储到 apache spark 和 PostgreSQL DB,以便我可以将其加载到 Grafana。

我还尝试了 , here, and here 中的几种解决方案。不过,好像还是有问题。

def on_data(self, data):
        (width, height) = getTerminalSize()
        try:
            # here data comes in a multiple json object each in a line; but when i'm trying to read it line by line using splitlines() and for loop for each line it still return multiple lines so that i can't isert into the postgresql table
            lines = data.splitlines()
            for line in lines:
                d = json.loads(line)                  
                # processing the data
                if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
                    teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
                    print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
                    print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
                    print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
                    # send data to socket for processing in spark
                    self.client_socket.send(data)
                    # insert record to table
                    try:
                        conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                        cursor = conn.cursor()
                        # insert ops
                        item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
                        sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                        cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
                        conn.commit()
                        count = cursor.rowcount
                        print (count, " Record inserted successfully into table")
                    except (Exception, psycopg2.Error) as error:
                        if(conn):
                            print("Failed to insert record into table", error)
                    finally:
                        #closing database connection.
                        if(conn):
                            cursor.close()
                            conn.close()
                            print(" [*] PostgreSQL connection is closed")
                    print console_colors.GREY+ "_" * width + console_colors.ENDC

                elif 'extended_tweet' not in d and 'RT @' not in d['text'].encode('utf-8'):
                    teks_twit = str(d['text'].encode('utf-8'))
                    print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
                    print("GMT: "+console_colors.WHITE+time.strftime("%a, %d %b %Y %I:%M:%S GMT"+console_colors.ENDC, time.gmtime()))
                    print("Local: "+console_colors.WHITE+strftime("%a, %d %b %Y %I:%M:%S %p %Z (GMT+7)\r"+console_colors.ENDC))
                    # send data to socket for processing in spark
                    self.client_socket.send(data)
                    # insert record to table
                    try:
                        conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                        cursor = conn.cursor()
                        # insert ops
                        item = {'SCREENNAME': str(d['user']['screen_name'].encode('utf-8')), 'TWEET': teks_twit, 'WORDS': teks_twit.split(), 'HASHTAGS': str(d['entities']['hashtags']['text']), 'WORDCOUNTS': len([teks_twit.split()])}
                        sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, WORDS, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                        cursor.execute(sqlquery, (item['SCREENNAME'],item['TWEET'],item['WORDS'],item['HASHTAGS'],item['WORDCOUNTS']))
                        conn.commit()
                        count = cursor.rowcount
                        print (count, " Record inserted successfully into table")
                    except (Exception, psycopg2.Error) as error :
                        if(conn):
                            print("Failed to insert record into table", error)
                    finally:
                        #closing database connection.
                        if(conn):
                            cursor.close()
                            conn.close()
                            print(" [*] PostgreSQL connection is closed")
                    print console_colors.GREY+ "_" * width + console_colors.ENDC

                else:
                    pass

        except BaseException as e:
            print("Error on_data: %s" % str(e))

每轮 for 循环的预期结果示例:

d['user']['screen_name']
fooUser

每回合只有一个结果。

实际结果:

d['user']['screen_name']
fooUser
userNumber2
anotherUser

for 循环中的每一轮都有一个以上的结果。

更新:

我更新了代码,似乎 d['user']['screen_name'] 的结果以及其他键的成功 return 单个项目,就在它被插入到 PostgreSQL table 之前。

但是,TypeError('list indices must be integers, not str) 消息仍然存在。仍在尝试其他选择。

这些是变化:

...
try:
            # split each line of data
            for item in data.splitlines():
                # loads the data into json object
                d = json.loads(item)  
                # processing the data
                if d["user"]["lang"] == "id":
                    if 'extended_tweet' in d and 'RT @' not in d['extended_tweet']['full_text'].encode('utf-8'):
                        teks_twit = str(d['extended_tweet']['full_text'].encode('utf-8'))
                        print console_colors.OKBLUE + str(d['user']['screen_name'].encode('utf-8')) + ": " + console_colors.ENDC + teks_twit + "\n"
...

...
try:
                            conn = psycopg2.connect(user=settings.RIPPLEET_USER,password=settings.RIPPLEET_PASS,host=settings.PSQL_HOST,port=settings.PSQL_PORT,database=settings.PSQL_DB)
                            cursor = conn.cursor()
                            # insert ops
                            # check if the data is already separated
                            print d['user']['screen_name']
                            # insert data into table
                            sqlquery = ''' INSERT INTO rippleet_tweet (SCREENNAME, TWEET, HASHTAGS, WORDCOUNTS) VALUES (%s,%s,%s,%s,%d) '''
                            cursor.execute(sqlquery, (str(d['user']['screen_name'].encode('utf-8')), str(d['extended_tweet']['full_text'].encode('utf-8')), str(d['entities']['hashtags']['text']), len(str(d['extended_tweet']['full_text'].encode('utf-8')).split())))

                            conn.commit()
                            count = cursor.rowcount
                            print (count, " Record inserted successfully into table")
                        except (Exception, psycopg2.Error) as error:
                            if(conn):
                                print("Failed to insert record into table", error)
                        finally:
                            #closing database connection.
                            if(conn):
                                cursor.close()
                                conn.close()
                                print(" [*] PostgreSQL connection is closed")
...

最后,在阅读了许多资源并进行了反复试验,并删除了 table 中包含列表的一些字段后,我尝试将代码简单地修改为:

# split each line of data
     objects = data.splitlines()
     for line in objects:
        # loads the data into json object
        d = json.loads(line)
        # and then process the data...........

它终于能够将数据插入到 PostgreSQL 数据库中。 :)