pyspark 如何记住内存中的某些内容,例如 mapreduce 中的 class 属性?
How can pyspark remember something in memory like class attributes in mapreduce?
我有一个包含 2 列的 table:image_url
、comment
。同一张图片可能有很多条评论,文件中数据按image_url
排序
我需要抓取图像,并将其转换为二进制文件。这将需要很长时间。所以,对于同一张图片,我只想做一次。
在 mapreduce 中,我可以记住最后一行并在内存中得到结果。
class Mapper:
def __init__(self):
self.image_url = None
self.image_bin = None
def run(self, image_url, comment):
if image_url != self.image_url:
self.image_url = image_url
self.image_bin = process(image_url)
return self.image_url, self.image_bin, comment
我怎样才能在 pyspark 中做到这一点? rdd和dataframe都可以。
我建议您只处理分组版本的数据框。像这样:
from pyspark.sql import functions as F
# Assuming df is your dataframe
df = df.groupBy("image_url").agg(F.collect_list("comment").alias("comments"))
df = df.withColumn("image_bin", process(F.col("image_url")))
df.select(
"image_url",
"image_bin",
F.explode("comments").alias("comment"),
).show()
我发现 mapPartitions
有效。代码如下所示。
def do_cover_partition(partitionData):
last_url = None
last_bin = None
for row in partitionData:
data = row.asDict()
print(data)
if data['cover_url'] != last_url:
last_url = data['cover_url']
last_bin = url2bin(last_url)
print(data['comment'])
data['frames'] = last_bin
yield data
columns = ["id","cover_url","comment","frames"]
df = df.rdd.mapPartitions(do_cover_partition).map(lambda x: [x[c] for c in columns]).toDF(columns)
我有一个包含 2 列的 table:image_url
、comment
。同一张图片可能有很多条评论,文件中数据按image_url
排序
我需要抓取图像,并将其转换为二进制文件。这将需要很长时间。所以,对于同一张图片,我只想做一次。
在 mapreduce 中,我可以记住最后一行并在内存中得到结果。
class Mapper:
def __init__(self):
self.image_url = None
self.image_bin = None
def run(self, image_url, comment):
if image_url != self.image_url:
self.image_url = image_url
self.image_bin = process(image_url)
return self.image_url, self.image_bin, comment
我怎样才能在 pyspark 中做到这一点? rdd和dataframe都可以。
我建议您只处理分组版本的数据框。像这样:
from pyspark.sql import functions as F
# Assuming df is your dataframe
df = df.groupBy("image_url").agg(F.collect_list("comment").alias("comments"))
df = df.withColumn("image_bin", process(F.col("image_url")))
df.select(
"image_url",
"image_bin",
F.explode("comments").alias("comment"),
).show()
我发现 mapPartitions
有效。代码如下所示。
def do_cover_partition(partitionData):
last_url = None
last_bin = None
for row in partitionData:
data = row.asDict()
print(data)
if data['cover_url'] != last_url:
last_url = data['cover_url']
last_bin = url2bin(last_url)
print(data['comment'])
data['frames'] = last_bin
yield data
columns = ["id","cover_url","comment","frames"]
df = df.rdd.mapPartitions(do_cover_partition).map(lambda x: [x[c] for c in columns]).toDF(columns)