如何将 base64 格式的音频文件转换为 .wav 文件而不将它们存储在 Python 的当前目录中?
How to convert base64-format audio files into .wav files without storage them on current directory in Python?
我想知道是否有更好的方法可以将 base64 格式的音频文件转换为 .wav 文件而不将它们存储在当前目录中。
问题是我从用户上传的 POST 请求中获取了 base64 格式的音频文件我为预处理和转录文件而创建的函数,这些函数使用 .wav 文件的 wave 模块。由于我创建 .wav 文件只是为了转录它们,所以我不需要存储它们,最后我用 os.unlink 函数删除了它们。
import fastapi_server.preprocessing_f as pr
app = FastAPI()
class AudioBase64(BaseModel):
audio_name: str = Field(..., min_length=1, example="my-audio")
data_base64: str = Field(..., min_length=1)
@app.post(
path="/upload-base64-audios/",
status_code=status.HTTP_200_OK
)
async def upload_base64_audios(audios: list[AudioBase64] = Body(...)):
model: str = "~/models"
dir_name = os.path.expanduser(model)
output_graph, scorer = pr.resolve_models(dir_name)
model_retval: List[str] = pr.load_model(output_graph, scorer)
all_names: list[str] = []
all_datas: list[str] = []
all_decode: list[str] = []
aggresive = 1
transcriptions: list[str] = []
new_data: list[str] = []
final_data: list[str] = []
header: list[str] = ["audio_name", "transcriptions"]
for i in range(len(audios)):
name = audios[i].audio_name
data = audios[i].data_base64
decode = base64.b64decode(data)
all_names.append(name)
all_datas.append(data)
all_decode.append(decode)
filename = "%s.wav" % name
with open(filename, "wb") as f:
f.write(decode)
cwd = os.getcwd()
files = glob.glob(cwd + "/" + name + ".wav")
segments, sample_rate, audio_length = pr.vad_segment_generator(
files[0], aggresive
)
for k, segment in enumerate(segments):
audio = np.frombuffer(segment, dtype=np.int16)
output = pr.stt(model_retval[0], audio)
transcript = output[0]
transcriptions.append(transcript)
new_data = [all_names[i], transcriptions[i]]
final_data.append(new_data)
dir_files = glob.glob(cwd + "/*.wav")
for file in dir_files:
os.unlink(file)
new_df = pd.DataFrame(final_data, columns=header)
stream = io.StringIO()
new_df.to_csv(stream, index=False)
response: Response = StreamingResponse(
iter([stream.getvalue()]), media_type="text/csv"
)
response.headers["Content-Disposition"] = "attachment; filename=my-file.csv"
return response
作为,
尝试中间写入一个 io.BytesIO
,但写入后,调用 .seek(0)
到 return 蒸汽位置到开始,而不是调用 getbuffer()
(写入后流位置会在最后,准备更多数据)
with io.BytesIO() as buffer:
buffer.write(decode)
buffer.seek(0) # rewind stream
...
segments, sample_rate, audio_length = pr.vad_segment_generator(
buffer, aggresive)
我想知道是否有更好的方法可以将 base64 格式的音频文件转换为 .wav 文件而不将它们存储在当前目录中。
问题是我从用户上传的 POST 请求中获取了 base64 格式的音频文件我为预处理和转录文件而创建的函数,这些函数使用 .wav 文件的 wave 模块。由于我创建 .wav 文件只是为了转录它们,所以我不需要存储它们,最后我用 os.unlink 函数删除了它们。
import fastapi_server.preprocessing_f as pr
app = FastAPI()
class AudioBase64(BaseModel):
audio_name: str = Field(..., min_length=1, example="my-audio")
data_base64: str = Field(..., min_length=1)
@app.post(
path="/upload-base64-audios/",
status_code=status.HTTP_200_OK
)
async def upload_base64_audios(audios: list[AudioBase64] = Body(...)):
model: str = "~/models"
dir_name = os.path.expanduser(model)
output_graph, scorer = pr.resolve_models(dir_name)
model_retval: List[str] = pr.load_model(output_graph, scorer)
all_names: list[str] = []
all_datas: list[str] = []
all_decode: list[str] = []
aggresive = 1
transcriptions: list[str] = []
new_data: list[str] = []
final_data: list[str] = []
header: list[str] = ["audio_name", "transcriptions"]
for i in range(len(audios)):
name = audios[i].audio_name
data = audios[i].data_base64
decode = base64.b64decode(data)
all_names.append(name)
all_datas.append(data)
all_decode.append(decode)
filename = "%s.wav" % name
with open(filename, "wb") as f:
f.write(decode)
cwd = os.getcwd()
files = glob.glob(cwd + "/" + name + ".wav")
segments, sample_rate, audio_length = pr.vad_segment_generator(
files[0], aggresive
)
for k, segment in enumerate(segments):
audio = np.frombuffer(segment, dtype=np.int16)
output = pr.stt(model_retval[0], audio)
transcript = output[0]
transcriptions.append(transcript)
new_data = [all_names[i], transcriptions[i]]
final_data.append(new_data)
dir_files = glob.glob(cwd + "/*.wav")
for file in dir_files:
os.unlink(file)
new_df = pd.DataFrame(final_data, columns=header)
stream = io.StringIO()
new_df.to_csv(stream, index=False)
response: Response = StreamingResponse(
iter([stream.getvalue()]), media_type="text/csv"
)
response.headers["Content-Disposition"] = "attachment; filename=my-file.csv"
return response
作为io.BytesIO
,但写入后,调用 .seek(0)
到 return 蒸汽位置到开始,而不是调用 getbuffer()
(写入后流位置会在最后,准备更多数据)
with io.BytesIO() as buffer:
buffer.write(decode)
buffer.seek(0) # rewind stream
...
segments, sample_rate, audio_length = pr.vad_segment_generator(
buffer, aggresive)