是否可以允许用户在 FastAPI 或 Flask 中下载 pyspark 数据帧的结果

Is it possible to allow users to download the result of a pyspark dataframe in FastAPI or Flask

我正在开发一个使用 FastAPI 的 API,用户可以向其发出请求以执行以下操作:

  1. 首先,获取请求将从 Google 云存储中获取一个文件并将其加载到 pyspark DataFrame
  2. 然后应用程序将对 DataFrame 执行一些转换
  3. 最后,我想将DataFrame作为parquet文件写入用户的磁盘。

我不太明白如何以 parquet 格式将文件传送给用户,原因如下:

这在 FastAPI 中甚至可能吗?是否可以在 Flask 中使用 send_file()?

这是我目前的代码。请注意,我已经尝试了一些方法,例如注释代码,但都无济于事。

import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')

@router.get("/applications")
def applications():
    df.write.parquet("temp.parquet", compression="snappy")
    return FileResponse("part-some-compressed-file.snappy.parquet")
    # with tempfile.TemporaryFile() as f:
    #     f.write(df.rdd.saveAsPickleFile("temp.parquet"))
    #     return FileResponse("test.parquet")

谢谢!

编辑: 我尝试使用提供的答案和信息 ,但我无法完全使用它。

我能够解决这个问题,但它远非优雅。如果有人能提供不写入磁盘的解决方案,我将不胜感激,并将 select 您的答案作为正确答案。

我能够使用 df.rdd.saveAsPickleFile() 序列化 DataFrame,压缩生成的目录,将其传递给 python 客户端,将生成的压缩文件写入磁盘,解压缩,然后使用 SparkContext().pickleFile 在最终加载 DataFrame 之前。我认为远非理想。

API:

import shutil
import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse


router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/my-file.parquet')

@router.get("/applications")
def applications():
    temp_parquet = tempfile.NamedTemporaryFile()
    temp_parquet.close()
    df.rdd.saveAsPickleFile(temp_parquet.name)

    shutil.make_archive('test', 'zip', temp_parquet.name)

    return FileResponse('test.zip')

客户:

import io
import zipfile

import requests

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

response = requests.get("http://0.0.0.0:5000/applications")
file_like_object = io.BytesIO(response.content)
with zipfile.ZipFile(file_like_object) as z:
    z.extractall('temp.data')

rdd = sc.pickleFile("temp.data")
df = spark.createDataFrame(rdd)

print(df.head())