Spark:如何将单个行作为 JPG 写入 S3/HDFS
Spark: How do I write individual row to S3 / HDFS as JPG
我必须将数据作为单个 JPG 文件(约数百万)从 PySpark 写入 S3 存储桶。
我尝试了多种选择:
设置:AWS EMR 集群和 Jupyter notebook。
- 在 'foreach' 方法中创建一个 boto3 客户端并写入 S3 ==> 我们为每个任务打开客户端时速度太慢且效率低下。
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
client = boto3.client('s3')
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test', Key='out_images/'+file_name)
myRdd.foreach(get_image)
- 写入本地文件系统和 运行 到 S3 的“aws S3 副本” => 如果数据写入每个单独的工作节点的卷,则不清楚如何访问此数据。在作业 运行ning 时登录工作节点,但无法准确找到 JPG 写入的位置。
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)
- 稍后写入 HDFS 和 运行 s3-dist-cp。可能是最有效的,但尚未成功使用代码。
I get path cannot be found exceptions
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "hdfs://" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)
有人可以提出实现此目标的好方法吗?
如果将 foreach
替换为 foreachPartition,则解决方案 1 效果很好。此更改后,每个分区仅创建一个客户端:
def get_image(y_it):
client = boto3.client('s3')
for y in y_it:
img_url = ...
cid = ...
res = requests.get(img_url, stream=True)
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test', Key='out_images/'+file_name)
myRdd.foreachPartition(get_image)
在 y_it
的循环中,重复使用同一个客户端。
如果requests.Sessions are used for the http call as described in this answer,事情甚至可以变得更快。在这种情况下,单个 http 会话在 y_it
循环外创建(如客户端),然后在循环内重复使用。
我必须将数据作为单个 JPG 文件(约数百万)从 PySpark 写入 S3 存储桶。
我尝试了多种选择:
设置:AWS EMR 集群和 Jupyter notebook。
- 在 'foreach' 方法中创建一个 boto3 客户端并写入 S3 ==> 我们为每个任务打开客户端时速度太慢且效率低下。
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
client = boto3.client('s3')
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test', Key='out_images/'+file_name)
myRdd.foreach(get_image)
- 写入本地文件系统和 运行 到 S3 的“aws S3 副本” => 如果数据写入每个单独的工作节点的卷,则不清楚如何访问此数据。在作业 运行ning 时登录工作节点,但无法准确找到 JPG 写入的位置。
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "./" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)
- 稍后写入 HDFS 和 运行 s3-dist-cp。可能是最有效的,但尚未成功使用代码。
I get path cannot be found exceptions
def get_image(y):
res = requests.get(img_url, stream=True)
file_name = "hdfs://" +str(cid) + ".jpg"
with open(file_name, 'wb') as f:
f.write(res.content)
myRdd.foreach(get_image)
有人可以提出实现此目标的好方法吗?
如果将 foreach
替换为 foreachPartition,则解决方案 1 效果很好。此更改后,每个分区仅创建一个客户端:
def get_image(y_it):
client = boto3.client('s3')
for y in y_it:
img_url = ...
cid = ...
res = requests.get(img_url, stream=True)
file_name = str(cid) + ".jpg"
client.put_object(Body=res.content, Bucket='test', Key='out_images/'+file_name)
myRdd.foreachPartition(get_image)
在 y_it
的循环中,重复使用同一个客户端。
如果requests.Sessions are used for the http call as described in this answer,事情甚至可以变得更快。在这种情况下,单个 http 会话在 y_it
循环外创建(如客户端),然后在循环内重复使用。