如何在 FastAPI 中保存 UploadFile
How to save UploadFile in FastAPI
我通过 POST 接受文件。当我保存在本地时,我可以使用file.read()读取内容,但是显示的名称是通过file.name incorrect(16)。当我试图用这个名字找到它时,我得到了一个错误。可能是什么问题?
我的代码:
@router.post(
path="/po/{id_po}/upload",
response_model=schema.ContentUploadedResponse,
)
async def upload_file(
id_po: int,
background_tasks: BackgroundTasks,
uploaded_file: UploadFile = File(...)):
"""pass"""
uploaded_file.file.rollover()
uploaded_file.file.flush()
#shutil.copy(uploaded_file.file.name, f'/home/fyzzy/Desktop/api/{uploaded_file.filename}')
background_tasks.add_task(s3_upload, uploaded_file=fp)
return schema.ContentUploadedResponse()
背景
UploadFile
只是 SpooledTemporaryFile
的包装器,可以作为 UploadFile.file
.
访问
SpooledTemporaryFile() [...] function operates exactly as TemporaryFile() does
关于 TemporaryFile
的 documentation 说:
Return a file-like object that can be used as a temporary storage area. [..] It will be destroyed as soon as it is closed (including an implicit close when the object is garbage collected). Under Unix, the directory entry for the file is either not created at all or is removed immediately after the file is created. Other platforms do not support this; your code should not rely on a temporary file created using this function having or not having a visible name in the file system.
async def
端点
您应该使用 UploadFile
的以下异步 methods:write
、read
、seek
和 close
。它们在线程池中执行并异步等待。
要将文件异步写入磁盘,您可以使用 aiofiles
。示例:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
content = await in_file.read() # async read
await out_file.write(content) # async write
return {"Result": "OK"}
或者以分块的方式,以免将整个文件加载到内存中:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
while content := await in_file.read(1024): # async read chunk
await out_file.write(content) # async write chunk
return {"Result": "OK"}
def
端点
此外,我想从这个 topic(所有学分 @dmontagu)使用 shutil.copyfileobj
和内部 UploadFile.file
:
引用几个有用的实用函数
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable
from fastapi import UploadFile
def save_upload_file(upload_file: UploadFile, destination: Path) -> None:
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
finally:
upload_file.file.close()
def save_upload_file_tmp(upload_file: UploadFile) -> Path:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file, tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return tmp_path
def handle_upload_file(
upload_file: UploadFile, handler: Callable[[Path], None]
) -> None:
tmp_path = save_upload_file_tmp(upload_file)
try:
handler(tmp_path) # Do something with the saved temp file
finally:
tmp_path.unlink() # Delete the temp file
Note: you'd want to use the above functions inside of def
endpoints, not async def
, since they make use of blocking APIs.
您可以这样保存上传的文件,
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
<b>file_location = f"files/{uploaded_file.filename}"
with open(file_location, "wb+") as file_object:
file_object.write(uploaded_file.file.read())</b>
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
这与shutil.copyfileobj(...)
方法的用法几乎相同。
所以,上面的函数可以re-written为,
<b>import shutil</b>
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
file_location = f"files/{uploaded_file.filename}"
with open(file_location, "wb+") as file_object:
<b>shutil.copyfileobj(uploaded_file.file, file_object)</b>
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
在我的例子中,我需要处理巨大的文件,所以我必须避免将它们全部读入内存。我想要的是将它们以块的形式异步保存到磁盘。
我正在对此进行试验,它似乎可以完成工作(CHUNK_SIZE 是相当随意选择的,需要进一步测试才能找到最佳尺寸):
import os
import logging
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
log = logging.getLogger(__name__)
app = FastAPI()
DESTINATION = "/"
CHUNK_SIZE = 2 ** 20 # 1MB
async def chunked_copy(src, dst):
await src.seek(0)
with open(dst, "wb") as buffer:
while True:
contents = await src.read(CHUNK_SIZE)
if not contents:
log.info(f"Src completely consumed\n")
break
log.info(f"Consumed {len(contents)} bytes from Src file\n")
buffer.write(contents)
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
fullpath = os.path.join(DESTINATION, file.filename)
await chunked_copy(file, fullpath)
return {"File saved to disk at": fullpath}
但是,我很快意识到 create_upload_file
在文件完全接收之前不会被调用。因此,如果此代码片段是正确的,它可能对性能有益,但不会启用任何功能,例如向客户端提供有关上传进度的反馈,并且它将在服务器中执行完整的数据复制。不能只访问原始的 UploadFile 临时文件,将其刷新并将其移动到其他地方,从而避免复制,这似乎很愚蠢。
您可以通过复制并粘贴以下代码来保存文件。
fastapi import (
FastAPI
UploadFile,
File,
status
)
from fastapi.responses import JSONResponse
import aiofiles
app = FastAPI( debug = True )
@app.post("/upload_file/", response_description="", response_model = "")
async def result(file:UploadFile = File(...)):
try:
async with aiofiles.open(file.filename, 'wb') as out_file:
content = await file.read() # async read
await out_file.write(content) # async write
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
else:
return JSONResponse(
status_code = status.HTTP_200_OK,
content = {"result":'success'}
)
如果你想上传多个文件然后复制粘贴下面的代码
fastapi import (
FastAPI
UploadFile,
File,
status
)
from fastapi.responses import JSONResponse
import aiofiles
app = FastAPI( debug = True )
@router.post("/upload_multiple_file/", response_description="", response_model = "")
async def result(files:List[UploadFile] = File(...), secret_key: str = Depends(secretkey_middleware)):
try:
for file in files:
async with aiofiles.open(eventid+file.filename, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
pass
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
else:
return JSONResponse(
status_code = status.HTTP_200_OK,
content = {"result":'result'}
)
我通过 POST 接受文件。当我保存在本地时,我可以使用file.read()读取内容,但是显示的名称是通过file.name incorrect(16)。当我试图用这个名字找到它时,我得到了一个错误。可能是什么问题?
我的代码:
@router.post(
path="/po/{id_po}/upload",
response_model=schema.ContentUploadedResponse,
)
async def upload_file(
id_po: int,
background_tasks: BackgroundTasks,
uploaded_file: UploadFile = File(...)):
"""pass"""
uploaded_file.file.rollover()
uploaded_file.file.flush()
#shutil.copy(uploaded_file.file.name, f'/home/fyzzy/Desktop/api/{uploaded_file.filename}')
background_tasks.add_task(s3_upload, uploaded_file=fp)
return schema.ContentUploadedResponse()
背景
UploadFile
只是 SpooledTemporaryFile
的包装器,可以作为 UploadFile.file
.
SpooledTemporaryFile() [...] function operates exactly as TemporaryFile() does
关于 TemporaryFile
的 documentation 说:
Return a file-like object that can be used as a temporary storage area. [..] It will be destroyed as soon as it is closed (including an implicit close when the object is garbage collected). Under Unix, the directory entry for the file is either not created at all or is removed immediately after the file is created. Other platforms do not support this; your code should not rely on a temporary file created using this function having or not having a visible name in the file system.
async def
端点
您应该使用 UploadFile
的以下异步 methods:write
、read
、seek
和 close
。它们在线程池中执行并异步等待。
要将文件异步写入磁盘,您可以使用 aiofiles
。示例:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
content = await in_file.read() # async read
await out_file.write(content) # async write
return {"Result": "OK"}
或者以分块的方式,以免将整个文件加载到内存中:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
while content := await in_file.read(1024): # async read chunk
await out_file.write(content) # async write chunk
return {"Result": "OK"}
def
端点
此外,我想从这个 topic(所有学分 @dmontagu)使用 shutil.copyfileobj
和内部 UploadFile.file
:
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable
from fastapi import UploadFile
def save_upload_file(upload_file: UploadFile, destination: Path) -> None:
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
finally:
upload_file.file.close()
def save_upload_file_tmp(upload_file: UploadFile) -> Path:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file, tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return tmp_path
def handle_upload_file(
upload_file: UploadFile, handler: Callable[[Path], None]
) -> None:
tmp_path = save_upload_file_tmp(upload_file)
try:
handler(tmp_path) # Do something with the saved temp file
finally:
tmp_path.unlink() # Delete the temp file
Note: you'd want to use the above functions inside of
def
endpoints, notasync def
, since they make use of blocking APIs.
您可以这样保存上传的文件,
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
<b>file_location = f"files/{uploaded_file.filename}"
with open(file_location, "wb+") as file_object:
file_object.write(uploaded_file.file.read())</b>
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
这与shutil.copyfileobj(...)
方法的用法几乎相同。
所以,上面的函数可以re-written为,
<b>import shutil</b>
from fastapi import FastAPI, File, UploadFile
app = FastAPI()
@app.post("/upload-file/")
async def create_upload_file(uploaded_file: UploadFile = File(...)):
file_location = f"files/{uploaded_file.filename}"
with open(file_location, "wb+") as file_object:
<b>shutil.copyfileobj(uploaded_file.file, file_object)</b>
return {"info": f"file '{uploaded_file.filename}' saved at '{file_location}'"}
在我的例子中,我需要处理巨大的文件,所以我必须避免将它们全部读入内存。我想要的是将它们以块的形式异步保存到磁盘。
我正在对此进行试验,它似乎可以完成工作(CHUNK_SIZE 是相当随意选择的,需要进一步测试才能找到最佳尺寸):
import os
import logging
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
log = logging.getLogger(__name__)
app = FastAPI()
DESTINATION = "/"
CHUNK_SIZE = 2 ** 20 # 1MB
async def chunked_copy(src, dst):
await src.seek(0)
with open(dst, "wb") as buffer:
while True:
contents = await src.read(CHUNK_SIZE)
if not contents:
log.info(f"Src completely consumed\n")
break
log.info(f"Consumed {len(contents)} bytes from Src file\n")
buffer.write(contents)
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
fullpath = os.path.join(DESTINATION, file.filename)
await chunked_copy(file, fullpath)
return {"File saved to disk at": fullpath}
但是,我很快意识到 create_upload_file
在文件完全接收之前不会被调用。因此,如果此代码片段是正确的,它可能对性能有益,但不会启用任何功能,例如向客户端提供有关上传进度的反馈,并且它将在服务器中执行完整的数据复制。不能只访问原始的 UploadFile 临时文件,将其刷新并将其移动到其他地方,从而避免复制,这似乎很愚蠢。
您可以通过复制并粘贴以下代码来保存文件。
fastapi import (
FastAPI
UploadFile,
File,
status
)
from fastapi.responses import JSONResponse
import aiofiles
app = FastAPI( debug = True )
@app.post("/upload_file/", response_description="", response_model = "")
async def result(file:UploadFile = File(...)):
try:
async with aiofiles.open(file.filename, 'wb') as out_file:
content = await file.read() # async read
await out_file.write(content) # async write
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
else:
return JSONResponse(
status_code = status.HTTP_200_OK,
content = {"result":'success'}
)
如果你想上传多个文件然后复制粘贴下面的代码
fastapi import (
FastAPI
UploadFile,
File,
status
)
from fastapi.responses import JSONResponse
import aiofiles
app = FastAPI( debug = True )
@router.post("/upload_multiple_file/", response_description="", response_model = "")
async def result(files:List[UploadFile] = File(...), secret_key: str = Depends(secretkey_middleware)):
try:
for file in files:
async with aiofiles.open(eventid+file.filename, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
pass
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
else:
return JSONResponse(
status_code = status.HTTP_200_OK,
content = {"result":'result'}
)