pyspark 如何记住内存中的某些内容,例如 mapreduce 中的 class 属性?

How can pyspark remember something in memory like class attributes in mapreduce?

我有一个包含 2 列的 table:image_urlcomment。同一张图片可能有很多条评论,文件中数据按image_url排序

我需要抓取图像,并将其转换为二进制文件。这将需要很长时间。所以,对于同一张图片,我只想做一次。

在 mapreduce 中,我可以记住最后一行并在内存中得到结果。

class Mapper:
  def __init__(self):
    self.image_url = None
    self.image_bin = None
  def run(self, image_url, comment):
     if image_url != self.image_url:
       self.image_url = image_url
       self.image_bin = process(image_url)
     return self.image_url, self.image_bin, comment

我怎样才能在 pyspark 中做到这一点? rdd和dataframe都可以。

我建议您只处理分组版本的数据框。像这样:

from pyspark.sql import functions as F

# Assuming df is your dataframe
df = df.groupBy("image_url").agg(F.collect_list("comment").alias("comments"))

df = df.withColumn("image_bin", process(F.col("image_url")))

df.select(
    "image_url",
    "image_bin",
    F.explode("comments").alias("comment"),
).show()

我发现 mapPartitions 有效。代码如下所示。

def do_cover_partition(partitionData):
    last_url = None
    last_bin = None
    for row in partitionData:
        data = row.asDict()
        print(data)
        if data['cover_url'] != last_url:
            last_url = data['cover_url']
            last_bin = url2bin(last_url)
            print(data['comment'])
        data['frames'] = last_bin
        yield data

columns = ["id","cover_url","comment","frames"]
df = df.rdd.mapPartitions(do_cover_partition).map(lambda x: [x[c] for c in columns]).toDF(columns)