如何加快 Spotipy API 对数百万条记录的调用?

How to speed up Spotipy API calls for millions of records?

我正在尝试获取大约 4.5 年的 Spotify 前 200 排行榜的音频特征数据。它针对68个国家+全球排名,总共约2000万条记录。我正在查询包含所有这些数据的 SQL Lite 数据库。 这是为数据分析项目做准备,我目前将我的范围限制在每​​月的第三个星期五,因为我可以为图表提取一整天的音频功能的最快时间是 15.8 分钟。这是 18.5 天的直接处理以获得全部 1701 天。

有没有人看到我可以使它更快的方法? 我目前正在调用 spotipy.audio_features() 每个轨道 ID 的函数。该功能仅限于 100 个 ID,我不太确定那样会更快。

这是处理前的示例条目:

column_names = ['title', 'rank', 'date', 'artist', 'url', 'region', 'chart', 'trend', 'streams']

('You Were Right', 179, '2017-01-20', 'RÜFÜS DU SOL', 'https://open.spotify.com/track/77lqbary6vt1DSc1MBN6sx', 'Australia', 'top200', 'NEW_ENTRY', 14781)

处理后:

column_names = ['title', 'rank', 'date', 'artist', 'url', 'region', 'chart', 'trend', 'streams', 'track_id', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'duration_ms', 'time_signature']

('You Were Right', 179, '2017-01-20', 'RÜFÜS DU SOL', 'https://open.spotify.com/track/77lqbary6vt1DSc1MBN6sx', 'Australia', 'top200', 'NEW_ENTRY', 14781, '77lqbary6vt1DSc1MBN6sx', 0.708, 0.793, 5, -5.426, 0, 0.0342, 0.0136, 0.00221, 0.118, 0.734, 122.006, 239418, 4)

完整脚本:

import sqlite3
import os
import spotipy
import numpy as np
import pandas as pd
from spotipy.oauth2 import SpotifyClientCredentials
from requests.exceptions import ReadTimeout
from datetime import datetime

"""Gets the third Friday of each month and checks that the date exists in the database."""
def date_range_checker(cursor, start_date, end_date):
    # Put in the range for that year. It's till 2021.
    date_range = pd.date_range(start_date, end_date ,freq='WOM-3FRI')

    cursor.execute("""SELECT DISTINCT Date(date) FROM charts""")
    sql_date_fetch = cursor.fetchall()
    sql_dates = [r[0] for r in sql_date_fetch]

    validated_dates = []

    for date in date_range:
        # print(str(date)[0:-9])
        if str(date)[0:-9] in sql_dates:
            validated_dates.append(str(date)[0:-9])    
            
    return validated_dates

"""Connects to the database. For each date in validated_dates, it queries all the records with that date. 
Then splits the track IDs from the Spotify link into a new list of tuples. Then for each tuple in that list, it calls the Spotify API with the track ID.
Finally it creates a numpy array for the entire list so the csv converter can be used."""
def main(): 
    now_start = datetime.now()
    start_time = now_start.strftime("%H:%M:%S")
    print(f'Main Function - start time: {start_time}')

    """"This script queries """
    print("working on it...")

    dbname = 'charts.db'

    if os.path.exists(dbname):
        db = sqlite3.connect(dbname, isolation_level=None)
        cursor = db.cursor()

        """"Selects 3rd friday of the month because it takes about 15.8 minutes per day. That's 14.2 hours total to get one friday a month for all 4.5 years.
        Or 18.6 full days of processing for every single day for all 1701 days.
         Fridays are a preferable release day in the industry. Cite this later."""

        # Date range list created and checked in this function
        validated_dates = date_range_checker(cursor, '2017-02-01', '2017-12-31') # change year here

        column_names = ['title', 'rank', 'date', 'artist', 'url', 'region', 'chart', 'trend', 'streams', 'track_id', 'danceability', 
        'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 
        'duration_ms', 'time_signature']
        
        for date_chosen in validated_dates:
            cursor.execute("""SELECT * FROM charts WHERE Date("date") = ?""", (date_chosen,))
            db_result = cursor.fetchall()

            data_with_track_ids = []
            final_data = []

            # Splits ID from Spotify link.
            for entry in db_result:
                track_id = entry[4].split('/')[-1]
                entry += (track_id,)
                data_with_track_ids.append(entry)

            print("I've got all the track IDs. Will start calls to Spotify API now.")

            # Calls to spotify with the new extracted track_id
            for entry in data_with_track_ids:
                track_id = entry[-1]

                try:
                    audio_features = spotify.audio_features(track_id)
                except ReadTimeout:
                    print('Spotify timed out... trying again...')
                    audio_features = spotify.audio_features(track_id)

                entry += (audio_features[0]['danceability'], audio_features[0]['energy'], audio_features[0]['key'], 
                audio_features[0]['loudness'], audio_features[0]['mode'], audio_features[0]['speechiness'], audio_features[0]['acousticness'], 
                audio_features[0]['instrumentalness'], audio_features[0]['liveness'],
                audio_features[0]['valence'], audio_features[0]['tempo'], audio_features[0]['duration_ms'], audio_features[0]['time_signature'])
                
                final_data.append(entry)

            np_data = np.array(final_data)
            my_dataframe = pd.DataFrame(np_data, columns=column_names)
            my_dataframe.to_csv(f'spotify_csv_data/spotify_top_200 {date_chosen}.csv')

            now_end = datetime.now()
            end_time = now_end.strftime("%H:%M:%S")
            print(f'Main Function - Start time: {start_time}. End time: {end_time}.')
            print(f'The date {date_chosen} took {now_end - now_start} to run.')
            

    db.close() 



if __name__ == "__main__":
    now_start = datetime.now()
    start_time = now_start.strftime("%H:%M:%S")
    print(f'Script - start time: {start_time}')


    os.environ['SPOTIPY_CLIENT_ID'] = 'ENTER YOUR CLIENT_ID'
    os.environ['SPOTIPY_CLIENT_SECRET'] = 'ENTER YOUR CLIENT_SECRET'

    # Allows for retries. Seems to be enough that it doesn't crash.
    spotify = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials(), requests_timeout=10, retries=10) 
    """Leave above set."""
    
    main()

    now_end = datetime.now()
    end_time = now_end.strftime("%H:%M:%S")
    print(f'Script - Start time: {start_time}. End time: {end_time}.')
    print(f'This script took {now_end - now_start} to run.\n')

个人资料,个人资料,个人资料。但瓶颈可能是 soptify 的 api。虽然您可以并行化以加快抓取速度,但他们不会为此感谢您,如果您做得太多,您可能会发现自己的速率受到限制。因此,分析并查看什么 正在占用时间,但要准备好减少数据集。

问问自己可以做些什么来加快算法:

  • 能不能只取前N个点击?
  • 你真的需要那么多数据吗?
  • 是否有重复的数据?

即使数据 没有 重复,创建一个本地缓存,由 track_id 索引,并将每个请求存储在其中。不是从 spotify 端点请求,而是在缓存中查找它(将数据存储在另一个 sqlite 数据库中,或另一个 table 在同一个数据库中)。如果什么都没有returned,获取,将数据保存到缓存,然后return它。这样:

  • 如果您 进行冗余查找,它会更快。
  • 即使您不是,如果您更改了某些内容并需要再次 运行 ,您也可以非常快地重新 运行 您的代码(至少就您当前的速度而言)。

所以缓存,配置文件,看看你的算法。

一些提高性能的想法:

  1. 使用并行处理

由于您使用的是 Python,代码 运行ning 是单线程的。

使用 Python 的 multiprocessing 库,您可以(例如)运行 4 个相同代码的实例,但 start/end 日期均等。这可以使您的数据处理速度提高约 4 倍。您只需要以不重叠的方式写入数据。

注意:如果您受到 Spotify API 的速率限制(您很可能会),您可以为每个实例使用不同的 API 密钥。 (创建多个帐户或借朋友 API 密钥)。

  1. Sql 查询优化

值得调查您的查询以了解问题所在。我个人对SQL不是很熟悉,只是给你一些想法。

  1. 分析您的程序以了解更多信息。

How can you profile a Python script?

  1. 使用某种缓存技术来避免冗余 api 调用并避免填充重复数据。 (请参阅下面的潜在解决方案,在最后一段代码中使用 ids_seen

python3

# Splits ID from Spotify link.
for entry in db_result:
    track_id = entry[4].split('/')[-1]
    entry += (track_id,)
    data_with_track_ids.append(entry)

这段代码中,entry是什么类型? db_result有多大?

关于您的以下代码的另一件事值得一提:

python3

# Calls to spotify with the new extracted track_id
for entry in data_with_track_ids:
    track_id = entry[-1]

    try:
        audio_features = spotify.audio_features(track_id)
    except ReadTimeout:
        print('Spotify timed out... trying again...')
        audio_features = spotify.audio_features(track_id)

    entry += (audio_features[0]['danceability'], audio_features[0]['energy'], audio_features[0]['key'], 
    audio_features[0]['loudness'], audio_features[0]['mode'], audio_features[0]['speechiness'], audio_features[0]['acousticness'], 
    audio_features[0]['instrumentalness'], audio_features[0]['liveness'],
    audio_features[0]['valence'], audio_features[0]['tempo'], audio_features[0]['duration_ms'], audio_features[0]['time_signature'])
    
    final_data.append(entry)

在 try-except 块中,您正在为 data_with_track_ids 中的每个条目发出请求。 data_with_track_ids 数据结构中有多少个元素?如果您暴力 api 调用,预计会被 Spotify 服务器限制和超时。

您应该在超时后添加一个较短的等待时间以减少机会 获得速率限制或 IP 被禁止。哦等等,看起来当你初始化 spotify 变量时,重试是在 spotipy 源代码中自动设置和处理的。

编辑

这里有一种方法可以避免使用Python 的集合数据结构进行冗余请求。这可以作为你的“缓存”:

# Calls to spotify with the new extracted track_id
ids_seen = set()
for entry in data_with_track_ids:
    track_id = entry[-1]

    if track_id not in ids_seen:
        try:
            # retries are already built-in and defined in your __main__(), spotify variable
            audio_features = spotify.audio_features(track_id)
        except SpotifyException as se:
            print('Spotify timed out...Maximum retries exceeded...moving on to next track_id...')
            print("TRACK ID IS: {}".format(track_id))
            print("Error details: {}".format(se))
            ids_seen.add(track_id)
            continue
        # on success, add track id to ids seen
        ids_seen.add(track_id)
    else:
        print("We have seen this ID before... ID = {}".format(track_id))
        continue # skips the next 5 instructions and starts again at top of loop, next iteration

    entry += (audio_features[0]['danceability'], audio_features[0]['energy'], audio_features[0]['key'], 
    audio_features[0]['loudness'], audio_features[0]['mode'], audio_features[0]['speechiness'], audio_features[0]['acousticness'], 
    audio_features[0]['instrumentalness'], audio_features[0]['liveness'],
    audio_features[0]['valence'], audio_features[0]['tempo'], audio_features[0]['duration_ms'], audio_features[0]['time_signature'])
    
    final_data.append(entry)

如果限制为 1000 req/day,则只需让程序休眠 24 小时或停止程序(并保存当前迭代和数据上下文),并在允许更多请求后再次 运行 . 参见 https://developer.spotify.com/documentation/web-api/guides/rate-limits/

您正在为每个曲目调用 spotify.audio_features(track_id),即使您已经获取了它的数据。每个星期五的结果应该只介绍几首新歌,但您正在重新获取所有 200 首歌曲的信息。不要那样做。为歌曲信息创建另一个数据库 table。获取 track_id 的信息后,将其写入数据库。在获取 track_id 的信息之前,请查看您是否已将其存储在数据库中。然后你只会进行最少的必要 API 调用,而不是 200 * num_weeks * num_countries.