如何每天只遍历 RDD 的一部分?

How to use traverse just a section of a RDD per day?

这道题是关于代码设计的。如何在今天迭代 RDD 的一部分,在第二天迭代另一部分。

我已经构建了 20,000,000 行的 RDD 或 spark 数据框。我想从 lbs.amap.com 调用一个 API,但它每天只能被访问 300,000 次。

def gd_address(line):
    # GET rest api, return a list of values
    ...
# use these values to add columns to my RDD
df.rdd.map(lambda line: (line[0], line[1], gd_address(line)[0], gd_address(line)[1], gd_address(line)[2]), True)

程序刚遍历300,000行就停止,然后第二天再遍历下300,000行就停止,然后遍历整个RDD,怎么写?任何想法将不胜感激。

正如已经与@Glennie 讨论的那样,这里的关键点是使用唯一的增量行 ID。这意味着新数据将具有递增的增量 id,而旧数据的 id 应保持不变。换句话说,我们需要确保一个特定的记录在每次作业执行时都具有相同的对应 ID。为了创建这样的唯一 ID,您可以使用通过 RDD API 提供的 zipWithIndex。与 monotonically_increasing_id 相反,zipWithIndex 函数确保行 ID 的顺序值。这将对您的程序的性能发挥重要作用,因为正如我们将在下面看到的那样,它可以有效地减少您需要处理的行数。这是 zipWithIndex 方法的实现:

df.rdd.zipWithIndex() \
           .map(lambda (line, row_id): (row_id, line[0], line[1], gd_address(line)[0], gd_address(line)[1], gd_address(line)[2])) \
           .toDF(["row_id", "c1", "c2", "c3", "c4", "c5"])

第二个先决条件以及唯一行 ID 的存在是根据我们刚刚创建的行 ID 对数据进行排序。那么对于第一天,所需范围将为 0 - 299.000,对于第二天,对于 300.000 - 599.999,对于第三天,则为 600.000 - 899.999,依此类推。在检索和处理每个块后,您需要存储最后处理的行 ID。您可以选择将最后一个 ID 保存在文件系统或 HDFS 中。写入 HDFS 的一种方法是通过
df.select("max(row_id)").write.text("hdfs://cluster/user/hdfs/last_row_id.txt") 并阅读 spark.read.text("hdfs://cluster/user/hdfs/last_row_id.txt").

完整代码如下:

def callAmapAPI(data):
   for row in data:
     # make HTTP call here

# assign row id to df 
df = df.rdd.zipWithIndex() \
           .map(lambda (line, row_id): (row_id, line[0], line[1], gd_address(line)[0], gd_address(line)[1], gd_address(line)[2])) \
           .toDF(["row_id", "c1", "c2", "c3", "c4", "c5"]) 

# retrieve saved row id
lower_bound_rowid = spark.read.text("hdfs://cluster/user/hdfs/last_row_id.txt").first()[0]

chunk_size = 300000
upper_bound_rowid = lower_bound_id + chunk_size
partition_num = 8

# we restrict the number of rows to 300000 based on upper and lower bound
filtered_df = df.where(df["row_id"] > lower_bound_rowid &  df["row_id"] <= upper_bound_rowid) \
# optional allows more control to simultaneous calls to amap API i.e 8 concurrent HTTP calls
.repartition(partition_num, "row_id") \
.orderBy("row_id")

# call API for each partition
filtered_df.foreachPartition(callAmapAPI)

# save max row id for next day
filtered_df.select("max(row_id)") \
.write.mode('overwrite') \
.text("hdfs://cluster/user/hdfs/last_row_id.txt")

第二种方法使用 monotonically_increasing_id(不建议)

至于使用 monotonically_increasing_id 的方法,我相信只有当您的数据集保持不变(没有新行)时才有效,否则无法保证 generated_id 将保持不变每行都相同,因此您将无法跟踪最后处理的记录(Spark 可能会为同一记录生成不同的 ID)。尽管如果是这种情况并且 df 没有改变,那么您只能调用 monotonically_increasing_id() 一次并保存添加了新 ID 的 df。在这种情况下,您将需要进行接下来的两项更改。首先将 df 定义更改为:

df = df.withColumn("row_id", monotonically_increasing_id())
df.write.csv(...) # or some other storage

上面的代码片段应该只执行一次,这与之前在每次作业执行时计算和分配行 ID 的方法相反。

然后将 filtered_df 定义更改为:

df = spark.read.csv(...) # retrieve the dataset with monotonically_increasing_id

filtered_df = df.where(df["row_id"] > lower_bound_rowid) \
.orderBy("row_id") \
.limit(chunk_size) \
.repartition(partition_num, "row_id")

这里有两点需要注意。首先,我们不知道 upper_bound_rowid(monotonically_increasing_id 将为每个分区生成任意 id),因此 upper_bound_rowid 未在 where 子句中使用。其次 orderBy 应该出现在 limit 之前,否则我们无法确保 topN 行。由于 orderBy 是在更大的数据集上执行的,因此这种方法的性能也可能较慢。