Pushshift Reddit 网络抓取循环中的代码 efficiency/performance 改进
Code efficiency/performance improvement in Pushshift Reddit web scraping loop
我正在通过 Pushshift API 提取 Reddit 数据。更准确地说,我对 subreddit X 中搜索词 Y 的评论和帖子(提交)感兴趣,从现在到日期时间 Z(例如,subreddit /rwallstreetbets 中所有提到“GME”的评论)。所有这些参数都可以指定。到目前为止,我使用以下代码让它工作:
import pandas as pd
import requests
from datetime import datetime
import traceback
import time
import json
import sys
import numpy as np
username = "" # put the username you want to download in the quotes
subreddit = "gme" # put the subreddit you want to download in the quotes
search_query = "gamestop" # put the word you want to search for (present in comment or post) in the quotes
# leave either one blank to download an entire user's, subreddit's, or search word's history
# or fill in all to download a specific users history from a specific subreddit mentioning a specific word
filter_string = None
if username == "" and subreddit == "" and search_query == "":
print("Fill in either username or subreddit")
sys.exit(0)
elif username == "" and subreddit != "" and search_query == "":
filter_string = f"subreddit={subreddit}"
elif username != "" and subreddit == "" and search_query == "":
filter_string = f"author={username}"
elif username == "" and subreddit != "" and search_query != "":
filter_string = f"subreddit={subreddit}&q={search_query}"
elif username == "" and subreddit == "" and search_query != "":
filter_string = f"q={search_query}"
else:
filter_string = f"author={username}&subreddit={subreddit}&q={search_query}"
url = "https://api.pushshift.io/reddit/search/{}/?size=500&sort=desc&{}&before="
start_time = datetime.utcnow()
def redditAPI(object_type):
global df_comments
df_comments = pd.DataFrame(columns=["date", "comment", "score", "id"])
global df_posts
df_posts = pd.DataFrame(columns=["date", "post", "score", "id"])
print(f"\nLooping through {object_type}s and append to dataframe...")
count = 0
previous_epoch = int(start_time.timestamp())
while True:
# Ensures that loop breaks at March 16 2021 for testing purposes
if previous_epoch <= 1615849200:
break
new_url = url.format(object_type, filter_string)+str(previous_epoch)
json_text = requests.get(new_url)
time.sleep(1) # pushshift has a rate limit, if we send requests too fast it will start returning error messages
try:
json_data = json.loads(json_text.text)
except json.decoder.JSONDecodeError:
time.sleep(1)
continue
if 'data' not in json_data:
break
objects = json_data['data']
if len(objects) == 0:
break
df2 = pd.DataFrame.from_dict(objects)
for object in objects:
previous_epoch = object['created_utc'] - 1
count += 1
if object_type == "comment":
df2.rename(columns={'created_utc': 'date', 'body': 'comment'}, inplace=True)
df_comments = df_comments.append(df2[['date', 'comment', 'score']])
elif object_type == "submission":
df2.rename(columns={'created_utc': 'date', 'selftext': 'post'}, inplace=True)
df_posts = df_posts.append(df2[['date', 'post', 'score']])
# Convert UNIX to datetime
df_comments["date"] = pd.to_datetime(df_comments["date"],unit='s')
df_posts["date"] = pd.to_datetime(df_posts["date"],unit='s')
# Drop blank rows (the case when posts only consists of an image)
df_posts['post'].replace('', np.nan, inplace=True)
df_posts.dropna(subset=['post'], inplace=True)
# Drop duplicates (see last comment on https://www.reddit.com/r/pushshift/comments/b7onr6/max_number_of_results_returned_per_query/)
df_comments = df_comments.drop_duplicates()
df_posts = df_posts.drop_duplicates()
print("\nDone. Saved to dataframe.")
不幸的是,我确实遇到了一些性能问题。由于我基于 created_utc - 1 进行分页(因为我不想错过任何 comments/posts),初始数据框将包含重复项(因为不会有 100 (=API limit) new comments/posts every new second).如果我 运行 长时间框架的代码(例如当前时间 - 2021 年 3 月 1 日),这将导致一个巨大的数据帧,需要相当长的时间来处理。
正如现在的代码一样,重复项被添加到数据框中,只有在循环之后,它们才会被删除。有没有办法提高效率?例如。在 for 循环中检查对象是否已存在于数据框中?这会有所作为,性能明智吗?任何输入将不胜感激。
我建议使用布隆过滤器来检查值是否已经通过。
PyPi 上有一个包可以很容易地实现这一点。要使用布隆过滤器,您只需向过滤器添加一个“键”,这可以是用户名和评论的组合。通过这种方式,您可以检查您是否已经向数据框添加了评论。我建议您在您的方法中尽早使用布隆过滤器,即在您收到 API.
的响应之后
查询数据是可以做到一开始就没有重复的。
您正在使用 API 的 before
参数,允许仅获取严格在时间戳之前的记录。这意味着我们可以在每次迭代中将我们已有的最早记录的时间戳发送为 before
。在这种情况下,作为响应,我们只会有我们还没有看到的记录,所以没有重复。
在看起来像这样的代码中:
import pandas as pd
import requests
import urllib
import time
import json
def get_data(object_type, username='', subreddit='', search_query='', max_time=None, min_time=1615849200):
# start from current time if not specified
if max_time is None:
max_time = int(time.time())
# generate filter string
filter_string = urllib.parse.urlencode(
{k: v for k, v in zip(
['author', 'subreddit', 'q'],
[username, subreddit, search_query]) if v != ""})
url_format = "https://api.pushshift.io/reddit/search/{}/?size=500&sort=desc&{}&before={}"
before = max_time
df = pd.DataFrame()
while before > min_time:
url = url_format.format(object_type, filter_string, before)
resp = requests.get(url)
# convert records to dataframe
dfi = pd.json_normalize(json.loads(resp.text)['data'])
if object_type == 'comment':
dfi = dfi.rename(columns={'created_utc': 'date', 'body': 'comment'})
df = pd.concat([df, dfi[['id', 'date', 'comment', 'score']]])
elif object_type == 'submission':
dfi = dfi.rename(columns={'created_utc': 'date', 'selftext': 'post'})
dfi = dfi[dfi['post'].ne('')]
df = pd.concat([df, dfi[['id', 'date', 'post', 'score']]])
# set `before` to the earliest comment/post in the results
# next time we call requests.get(...) we will only get comments/posts before
# the earliest that we already have, thus not fetching any duplicates
before = dfi['date'].min()
# if needed
# time.sleep(1)
return df
通过获取评论和检查重复值进行测试(id
):
username = ""
subreddit = "gme"
search_query = "gamestop"
df_comments = get_data(
object_type='comment',
username=username,
subreddit=subreddit,
search_query=search_query)
df_comments['id'].duplicated().any() # False
df_comments['id'].nunique() # 2200
我正在通过 Pushshift API 提取 Reddit 数据。更准确地说,我对 subreddit X 中搜索词 Y 的评论和帖子(提交)感兴趣,从现在到日期时间 Z(例如,subreddit /rwallstreetbets 中所有提到“GME”的评论)。所有这些参数都可以指定。到目前为止,我使用以下代码让它工作:
import pandas as pd
import requests
from datetime import datetime
import traceback
import time
import json
import sys
import numpy as np
username = "" # put the username you want to download in the quotes
subreddit = "gme" # put the subreddit you want to download in the quotes
search_query = "gamestop" # put the word you want to search for (present in comment or post) in the quotes
# leave either one blank to download an entire user's, subreddit's, or search word's history
# or fill in all to download a specific users history from a specific subreddit mentioning a specific word
filter_string = None
if username == "" and subreddit == "" and search_query == "":
print("Fill in either username or subreddit")
sys.exit(0)
elif username == "" and subreddit != "" and search_query == "":
filter_string = f"subreddit={subreddit}"
elif username != "" and subreddit == "" and search_query == "":
filter_string = f"author={username}"
elif username == "" and subreddit != "" and search_query != "":
filter_string = f"subreddit={subreddit}&q={search_query}"
elif username == "" and subreddit == "" and search_query != "":
filter_string = f"q={search_query}"
else:
filter_string = f"author={username}&subreddit={subreddit}&q={search_query}"
url = "https://api.pushshift.io/reddit/search/{}/?size=500&sort=desc&{}&before="
start_time = datetime.utcnow()
def redditAPI(object_type):
global df_comments
df_comments = pd.DataFrame(columns=["date", "comment", "score", "id"])
global df_posts
df_posts = pd.DataFrame(columns=["date", "post", "score", "id"])
print(f"\nLooping through {object_type}s and append to dataframe...")
count = 0
previous_epoch = int(start_time.timestamp())
while True:
# Ensures that loop breaks at March 16 2021 for testing purposes
if previous_epoch <= 1615849200:
break
new_url = url.format(object_type, filter_string)+str(previous_epoch)
json_text = requests.get(new_url)
time.sleep(1) # pushshift has a rate limit, if we send requests too fast it will start returning error messages
try:
json_data = json.loads(json_text.text)
except json.decoder.JSONDecodeError:
time.sleep(1)
continue
if 'data' not in json_data:
break
objects = json_data['data']
if len(objects) == 0:
break
df2 = pd.DataFrame.from_dict(objects)
for object in objects:
previous_epoch = object['created_utc'] - 1
count += 1
if object_type == "comment":
df2.rename(columns={'created_utc': 'date', 'body': 'comment'}, inplace=True)
df_comments = df_comments.append(df2[['date', 'comment', 'score']])
elif object_type == "submission":
df2.rename(columns={'created_utc': 'date', 'selftext': 'post'}, inplace=True)
df_posts = df_posts.append(df2[['date', 'post', 'score']])
# Convert UNIX to datetime
df_comments["date"] = pd.to_datetime(df_comments["date"],unit='s')
df_posts["date"] = pd.to_datetime(df_posts["date"],unit='s')
# Drop blank rows (the case when posts only consists of an image)
df_posts['post'].replace('', np.nan, inplace=True)
df_posts.dropna(subset=['post'], inplace=True)
# Drop duplicates (see last comment on https://www.reddit.com/r/pushshift/comments/b7onr6/max_number_of_results_returned_per_query/)
df_comments = df_comments.drop_duplicates()
df_posts = df_posts.drop_duplicates()
print("\nDone. Saved to dataframe.")
不幸的是,我确实遇到了一些性能问题。由于我基于 created_utc - 1 进行分页(因为我不想错过任何 comments/posts),初始数据框将包含重复项(因为不会有 100 (=API limit) new comments/posts every new second).如果我 运行 长时间框架的代码(例如当前时间 - 2021 年 3 月 1 日),这将导致一个巨大的数据帧,需要相当长的时间来处理。
正如现在的代码一样,重复项被添加到数据框中,只有在循环之后,它们才会被删除。有没有办法提高效率?例如。在 for 循环中检查对象是否已存在于数据框中?这会有所作为,性能明智吗?任何输入将不胜感激。
我建议使用布隆过滤器来检查值是否已经通过。
PyPi 上有一个包可以很容易地实现这一点。要使用布隆过滤器,您只需向过滤器添加一个“键”,这可以是用户名和评论的组合。通过这种方式,您可以检查您是否已经向数据框添加了评论。我建议您在您的方法中尽早使用布隆过滤器,即在您收到 API.
的响应之后查询数据是可以做到一开始就没有重复的。
您正在使用 API 的 before
参数,允许仅获取严格在时间戳之前的记录。这意味着我们可以在每次迭代中将我们已有的最早记录的时间戳发送为 before
。在这种情况下,作为响应,我们只会有我们还没有看到的记录,所以没有重复。
在看起来像这样的代码中:
import pandas as pd
import requests
import urllib
import time
import json
def get_data(object_type, username='', subreddit='', search_query='', max_time=None, min_time=1615849200):
# start from current time if not specified
if max_time is None:
max_time = int(time.time())
# generate filter string
filter_string = urllib.parse.urlencode(
{k: v for k, v in zip(
['author', 'subreddit', 'q'],
[username, subreddit, search_query]) if v != ""})
url_format = "https://api.pushshift.io/reddit/search/{}/?size=500&sort=desc&{}&before={}"
before = max_time
df = pd.DataFrame()
while before > min_time:
url = url_format.format(object_type, filter_string, before)
resp = requests.get(url)
# convert records to dataframe
dfi = pd.json_normalize(json.loads(resp.text)['data'])
if object_type == 'comment':
dfi = dfi.rename(columns={'created_utc': 'date', 'body': 'comment'})
df = pd.concat([df, dfi[['id', 'date', 'comment', 'score']]])
elif object_type == 'submission':
dfi = dfi.rename(columns={'created_utc': 'date', 'selftext': 'post'})
dfi = dfi[dfi['post'].ne('')]
df = pd.concat([df, dfi[['id', 'date', 'post', 'score']]])
# set `before` to the earliest comment/post in the results
# next time we call requests.get(...) we will only get comments/posts before
# the earliest that we already have, thus not fetching any duplicates
before = dfi['date'].min()
# if needed
# time.sleep(1)
return df
通过获取评论和检查重复值进行测试(id
):
username = ""
subreddit = "gme"
search_query = "gamestop"
df_comments = get_data(
object_type='comment',
username=username,
subreddit=subreddit,
search_query=search_query)
df_comments['id'].duplicated().any() # False
df_comments['id'].nunique() # 2200