Spark:如何将单个行作为 JPG 写入 S3/HDFS

Spark: How do I write individual row to S3 / HDFS as JPG

我必须将数据作为单个 JPG 文件(约数百万)从 PySpark 写入 S3 存储桶。

我尝试了多种选择:

设置:AWS EMR 集群和 Jupyter notebook。

  1. 在 'foreach' 方法中创建一个 boto3 客户端并写入 S3 ==> 我们为每个任务打开客户端时速度太慢且效率低下。

def get_image(y):
    res = requests.get(img_url, stream=True)
    file_name = "./" +str(cid) + ".jpg"
    client = boto3.client('s3')
    file_name = str(cid) + ".jpg"
    client.put_object(Body=res.content, Bucket='test',  Key='out_images/'+file_name)

myRdd.foreach(get_image)

  1. 写入本地文件系统和 运行 到 S3 的“aws S3 副本” => 如果数据写入每个单独的工作节点的卷,则不清楚如何访问此数据。在作业 运行ning 时登录工作节点,但无法准确找到 JPG 写入的位置。

def get_image(y):
    res = requests.get(img_url, stream=True)
    file_name = "./" +str(cid) + ".jpg"
    with open(file_name, 'wb') as f:
        f.write(res.content)

myRdd.foreach(get_image)

  1. 稍后写入 HDFS 和 运行 s3-dist-cp。可能是最有效的,但尚未成功使用代码。 I get path cannot be found exceptions

def get_image(y):
    res = requests.get(img_url, stream=True)
    file_name = "hdfs://" +str(cid) + ".jpg"
    with open(file_name, 'wb') as f:
        f.write(res.content)

myRdd.foreach(get_image)

有人可以提出实现此目标的好方法吗?

如果将 foreach 替换为 foreachPartition,则解决方案 1 效果很好。此更改后,每个分区仅创建一个客户端:

def get_image(y_it):
    client = boto3.client('s3')
    for y in y_it:
        img_url = ...
        cid = ...
        res = requests.get(img_url, stream=True)
        file_name = str(cid) + ".jpg"
        client.put_object(Body=res.content, Bucket='test',  Key='out_images/'+file_name)

myRdd.foreachPartition(get_image)

y_it 的循环中,重复使用同一个客户端。

如果requests.Sessions are used for the http call as described in this answer,事情甚至可以变得更快。在这种情况下,单个 http 会话在 y_it 循环外创建(如客户端),然后在循环内重复使用。