不清楚从 pandas 上传到 Google BigQuery table 时的错误消息

Unclear on error message in upload from pandas to Google BigQuery table

情况

我正在尝试将 Twitter API 数据的 pandas 数据帧上传到 BigQuery 中的 table。

这是来自 Google Colab notebook 的数据框准备代码:

!pip install --upgrade google-cloud-language
!pip install pandas-gbq -U

from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(name=fn, length=len(uploaded[fn])))

import os

# Imports Credential File:
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "pp-004a-d61bf3451d85.json"
print("Service Account Key: {}".format(os.environ["GOOGLE_APPLICATION_CREDENTIALS"]))

!pip install --upgrade tweepy

# VARIABLES

interval = "15"
start = '2022-04-07'
end = '2022-04-12'

# Tweepy
searchQ = '(max muncy) -is:retweet lang:en'
intval_tw = "{}T".format(interval)
start_tw = '{}T00:00:00Z'.format(start)
end_tw   = '{}T23:59:59Z'.format(end)

# index = pd.date_range('1/1/2000', periods=9, freq='T')
# D = calendar day frequency, H = hourly frequency, T, min = minutely frequency

# Library installs

import tweepy
# from twitter_authentication import bearer_token
import time
import pandas as pd
import requests
import json
import numpy as np

bearer_token = "BEARER_TOKEN"


client = tweepy.Client(bearer_token, wait_on_rate_limit=True)

# NEED TO ENSURE HAVE ALL PARAMETERS
gathered_tweets = []
for response in tweepy.Paginator(client.search_recent_tweets,
                                 query = searchQ,
                                 user_fields = ['name', 'description', 'username', 'profile_image_url', 'url', 'pinned_tweet_id', 'verified', 'created_at', 'location', 'public_metrics', 'entities'],
                                 tweet_fields = ['public_metrics', 'created_at','lang', 'attachments', 'context_annotations', 'conversation_id', 'entities', 'geo', 'in_reply_to_user_id', 'possibly_sensitive', 'referenced_tweets', 'reply_settings', 'source'],
                                 media_fields = ['duration_ms', 'media_key', 'preview_image_url', 'type', 'url', 'height', 'width', 'public_metrics'],
                                 expansions = ['author_id', 'attachments.media_keys', 'entities.mentions.username', 'geo.place_id', 'in_reply_to_user_id', 'referenced_tweets.id', 'referenced_tweets.id.author_id'],
                                 start_time = start_tw,
                                 end_time = end_tw,
                              max_results=100):
    time.sleep(1)
    gathered_tweets.append(response)



result = []
user_dict = {}
# Loop through each response object
for response in gathered_tweets:
  # Take all of the users, and put them into a dictionary of dictionaries with the info we want to keep
  for user in response.includes['users']:
      user_dict[user.id] = {'username': user.username,
                            'created_at': user.created_at,
                            'location': user.location,
                            'verified': user.verified,
                            'name': user.name,
                            'description': user.description,
                            'url': user.url,
                            'profile_image_url': user.profile_image_url,
                            'pinned_tweet': user.pinned_tweet_id,
                            'entities': user.entities,
                            'followers': user.public_metrics['followers_count'],
                            'total_tweets': user.public_metrics['tweet_count'],
                            'following': user.public_metrics['following_count'],
                            'listed': user.public_metrics['listed_count'],
                            'tweets': user.public_metrics['tweet_count']
                            }
  for tweet in response.data:
      # For each tweet, find the author's information
      author_info = user_dict[tweet.author_id]
      # Put all of the information we want to keep in a single dictionary for each tweet
      result.append({'author_id': tweet.author_id,
                   'username': author_info['username'],
                   'name': author_info['name'],
                   'author_followers': author_info['followers'],
                   'author_following': author_info['following'],
                   'author_tweets': author_info['tweets'],
                   'author_description': author_info['description'],
                   'author_url': author_info['url'],
                   'profile_image_url': author_info['profile_image_url'],
                   #'pinned_tweet': author_info['pinned_tweet_id'], https://developer.twitter.com/en/docs/twitter-api/tweets/lookup/api-reference/get-tweets
                   #'total_tweets': author_info['tweet_count'],
                   #'listed_count': author_info['listed_count'],
                   'entities': author_info['entities'],
                   'verified': author_info['verified'],
                   'account_created_at': author_info['created_at'],
                   'text': tweet.text,
                   'created_at': tweet.created_at,
                   'lang': tweet.lang,
                   'tweet_id': tweet.id,
                   'retweets': tweet.public_metrics['retweet_count'],
                   'replies': tweet.public_metrics['reply_count'],
                   'likes': tweet.public_metrics['like_count'],
                   'quotes': tweet.public_metrics['quote_count'],
                   'replied': tweet.in_reply_to_user_id,
                   'sensitive': tweet.possibly_sensitive,
                   'referenced_tweets': tweet.referenced_tweets,
                   'reply_settings': tweet.reply_settings,
                   'source': tweet.source
                   #'video_views': tweet.public_metrics['view_count']
                   })

dfTW00 = pd.DataFrame(result)

dfTW01 = dfTW00

# Create 'engagement' metric
dfTW01['engagement'] = dfTW01['retweets'] + dfTW01['replies'] + dfTW01['likes'] + dfTW01['quotes']

# Add 'tweets' column with value of 1
dfTW01['tweets'] = 1

# Engagement Rate calc
dfTW01['eng_rate'] = (dfTW01['tweets'] / dfTW01['engagement'])

# Add twitter link
dfTW01['base_url'] = 'https://twitter.com/twitter/status/'
# base_url = 'https://twitter.com/twitter/status/'
dfTW01['tweet_link'] = dfTW01['base_url'] + dfTW01['tweet_id'].astype(str)

# Imports the Google Cloud client library
from google.cloud import language_v1

# Instantiates a client
client = language_v1.LanguageServiceClient()


def get_sentiment(text):
    # The text to analyze
    document = language_v1.Document(
        content=text,
        type_=language_v1.types.Document.Type.PLAIN_TEXT
    )

    # Detects the sentiment of the text
    sentiment = client.analyze_sentiment(
        request={"document": document}
    ).document_sentiment

    return sentiment


dfTW01["sentiment"] = dfTW01["text"].apply(get_sentiment)

dfTW02 = dfTW01['sentiment'].astype(str).str.split(expand=True)
dfTW02

dfTW03 = pd.merge(dfTW01, dfTW02, left_index=True, right_index=True)

dfTW03.rename(columns = {1:'magnitude', 3:'score'}, inplace=True)

cols = ['magnitude', 'score']
dfTW03[cols] = dfTW03[cols].apply(pd.to_numeric, errors='coerce', axis=1)

def return_status(x):
    if x >= .5:
        return 'Positive'
    elif x <= -.5:
        return 'Negative'
    return 'Neutral'

dfTW03['sentiment2'] = dfTW03['score'].apply(return_status)

我试过的

这是我用于上传的内容(我已确认项目、数据集和 table 信息正确无误):

df.to_gbq('004a01.004a-TW-01', 
                 'pp-004a',
                 chunksize=None, 
                 if_exists='append'
                 )

结果

但是,该方法返回此错误消息:

TypeError: <' not supported between instances of 'int' and 'str'

评估

我在 SO 上找到了几个解决这个问题的帖子,但我无法将它们与我的情况联系起来。 (我认为可以将各种数据类型上传到 BigQuery table。)

首先,我不清楚 '<' not supported between instances of 'int' and 'str' 的错误消息是什么意思。

如有任何意见,我们将不胜感激。

如果有帮助,下面是我数据框中的 pandas dtype。

Dataframe dtypes

Pandas 数据帧数据类型:

author_id                           int64
username                           object
name                               object
author_followers                    int64
author_following                    int64
author_tweets                       int64
author_description                 object
author_url                         object
profile_image_url                  object
entities                           object
verified                             bool
account_created_at    datetime64[ns, UTC]
text                               object
created_at            datetime64[ns, UTC]
lang                               object
tweet_id                            int64
retweets                            int64
replies                             int64
likes                               int64
quotes                              int64
replied                           float64
sensitive                            bool
referenced_tweets                  object
reply_settings                     object
source                             object
engagement                          int64
tweets                              int64
eng_rate                          float64
base_url                           object
tweet_link                         object
sentiment                          object
0                                  object
magnitude                         float64
2                                  object
score                             float64
sentiment_rating                  float64
sentiment2                         object
dtype: object

代替 Pandas 中的 to_gbq() 函数,您可以尝试使用 BigQuery 库中的 load_table_from_dataframe() 函数将数据框加载到 BigQuery。

请参阅下面的示例 python 代码,使用 load_table_from_dataframe():

import datetime

from google.cloud import bigquery
import pandas
import pytz

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "my-project.my-dataset.my-table"

records = [
    {
        "title": "The Meaning of Life",
        "release_year": 1983,
        "length_minutes": 112.5,
        "release_date": pytz.timezone("Europe/Paris")
        .localize(datetime.datetime(1983, 5, 9, 13, 0, 0))
        .astimezone(pytz.utc),
        # Assume UTC timezone when a datetime object contains no timezone.
        "dvd_release": datetime.datetime(2002, 1, 22, 7, 0, 0),
    },
    {
        "title": "Monty Python and the Holy Grail",
        "release_year": 1975,
        "length_minutes": 91.5,
        "release_date": pytz.timezone("Europe/London")
        .localize(datetime.datetime(1975, 4, 9, 23, 59, 2))
        .astimezone(pytz.utc),
        "dvd_release": datetime.datetime(2002, 7, 16, 9, 0, 0),
    },
    {
        "title": "Life of Brian",
        "release_year": 1979,
        "length_minutes": 94.25,
        "release_date": pytz.timezone("America/New_York")
        .localize(datetime.datetime(1979, 8, 17, 23, 59, 5))
        .astimezone(pytz.utc),
        "dvd_release": datetime.datetime(2008, 1, 14, 8, 0, 0),
    },
    {
        "title": "And Now for Something Completely Different",
        "release_year": 1971,
        "length_minutes": 88.0,
        "release_date": pytz.timezone("Europe/London")
        .localize(datetime.datetime(1971, 9, 28, 23, 59, 7))
        .astimezone(pytz.utc),
        "dvd_release": datetime.datetime(2003, 10, 22, 10, 0, 0),
    },
]
dataframe = pandas.DataFrame(
    records,
    # In the loaded table, the column order reflects the order of the
    # columns in the DataFrame.
    columns=[
        "title",
        "release_year",
        "length_minutes",
        "release_date",
        "dvd_release",
    ],
    # Optionally, set a named index, which can also be written to the
    # BigQuery table.
    index=pandas.Index(
        ["Q24980", "Q25043", "Q24953", "Q16403"], name="wikidata_id"
    ),
)
job_config = bigquery.LoadJobConfig(
    # Specify a (partial) schema. All columns are always written to the
    # table. The schema is used to assist in data type definitions.
    schema=[
        # Specify the type of columns whose type cannot be auto-detected. For
        # example the "title" column uses pandas dtype "object", so its
        # data type is ambiguous.
        bigquery.SchemaField("title", bigquery.enums.SqlTypeNames.STRING),
        # Indexes are written if included in the schema by name.
        bigquery.SchemaField("wikidata_id", bigquery.enums.SqlTypeNames.STRING),
    ],
    # Optionally, set the write disposition. BigQuery appends loaded rows
    # to an existing table by default, but with WRITE_TRUNCATE write
    # disposition it replaces the table with the loaded data.
    write_disposition="WRITE_TRUNCATE"
)

job = client.load_table_from_dataframe(
    dataframe, table_id, job_config=job_config
)  # Make an API request.
job.result()  # Wait for the job to complete.

table = client.get_table(table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        table.num_rows, len(table.schema), table_id
    )
)