高效地批处理 Spark 数据帧以调用 API

Efficiently batching Spark dataframes to call an API

我是 Spark 的新手,我正在尝试使用 Spotipy 调用 Spotify API。我有一个艺术家 ID 列表,可用于获取艺术家信息。 Spotify API 允许一次批量调用最多 50 个 ID。我从 MySQL 数据库加载艺术家 ID 并将它们存储在数据框中。

我现在的问题是我不知道如何有效地将数据框分成 50 行或更少的行。

在下面的示例中,我将数据帧转换为常规 Python 列表,我可以从中调用 API 50 个批次。

知道如何在不返回 Python 列表的情况下执行此操作吗?

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from pyspark.sql import SparkSession
import os

spark = SparkSession\
        .builder\
        .appName("GetArtists")\
        .getOrCreate()

df = spark.read.format('jdbc') \
    .option("url", "jdbc:mysql://"+os.getenv("DB_SERVER")+":"+os.getenv("DB_PORT")+"/spotify_metadata")\
    .option("user", os.getenv("DB_USER"))\
    .option("password", os.getenv("DB_PW"))\
    .option("query", "SELECT artist_id FROM artists")\
    .load()

sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())

ids = [row['artist_id'] for row in df.collect()]

batch_size = 50
for i in range(0,len(ids), batch_size):
    artists = sp.artists( ids[i:i+batch_size] )

    # process the JSON response

我考虑过使用 foreach 并为每个 ID 调用 API,但这会导致不必要的请求。结果也存储回数据库中,这意味着我正在将许多单行写入数据库。

如果你想根据行号划分数据框,你可以这样做:

from pyspark.sql import functions as f
from pyspark.sql import Window

df = df.withColumn('row_num', f.row_number().over(Window.orderBy(f.lit(1))))
len = df.count()

for i in range(0,len, 50):
    df = df.filter(f.col('row_num')>=i & f.col('row_num')<=i+50)
    #api logic goes here

但是如果你可以将 df 直接传递给 api 然后传递 df 或收集 df 每次只有 50 个值。