我如何 bulk/batch 使用 python 转录 wav 文件?
How can I bulk/batch transcribe wav files using python?
我正在尝试使用我的 python 应用程序转录一个文件夹中的多个文件并加快该过程。
目前我可以一次处理一个文件 -
####RUN THIS PART FIRST#########
import json
from os.path import join, dirname
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
import threading
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
import pandas as pd
authenticator = IAMAuthenticator('xxyyzz')
service = SpeechToTextV1(authenticator=authenticator)
service.set_service_url('https://api.us-east.speech-to-text.watson.cloud.ibm.com')
models = service.list_models().get_result()
#print(json.dumps(models, indent=2))
model = service.get_model('en-US_BroadbandModel').get_result()
#print(json.dumps(model, indent=2))
# This is the name of the file u need to change below
with open(join(dirname('__file__'), 'Call 8.wav'),
'rb') as audio_file:
# print(json.dumps(
output = service.recognize(
audio=audio_file,
speaker_labels=True,
content_type='audio/wav',
#timestamps=True,
#word_confidence=True,
inactivity_timeout = -1,
model='en-US_NarrowbandModel',
continuous=True).get_result(),
indent=2
############END################################
# get data to a csv
########################RUN THIS PART SECOND#####################################
df0 = pd.DataFrame([i for elts in output for alts in elts['results'] for i in alts['alternatives']])
df1 = pd.DataFrame([i for elts in output for i in elts['speaker_labels']])
list(df0.columns)
list(df1.columns)
df0 = df0.drop(["timestamps"], axis=1)
df1 = df1.drop(["final"], axis=1)
df1 = df1.drop(['confidence'],axis=1)
test3 = pd.concat([df0, df1], axis=1)
#sentiment
transcript = test3['transcript']
transcript = transcript.dropna()
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
text = transcript
scores = []
for txt in text:
vs = analyzer.polarity_scores(txt)
scores.append(vs)
data = pd.DataFrame(text, columns= ['Text'])
data2 = pd.DataFrame(scores)
final_dataset= pd.concat([data,data2], axis=1)
test4 = pd.concat([test3,final_dataset], axis=1)
test4 = test4.drop(['Text'],axis=1)
test4.rename(columns={'neg':'Negative'},
inplace=True)
test4.rename(columns={'pos':'Positive'},
inplace=True)
test4.rename(columns={'neu':'Neutral'},
inplace=True)
# This is the name of the output csv file
test4.to_csv("Call 8.csv")
我如何做到这一点来转录一个文件夹中的多个文件而不是一次转录一个文件?我可以多次 运行 这个脚本,但我想自动化它以便它从中获取 wav 文件一个文件夹并 运行s 它。假设我的文件夹 C:\Python 中有 15 个音频 wav 文件。我想让它成为一个自动化过程,它将 运行 脚本并获得 15 个 csvs。每个人 1 个输出。现在这个脚本可以工作,但必须手动 运行 它为每个 wav 文件获取每个 wavs 输出 csv。
另外,作为第二个问题(抱歉!),有没有办法加快转录速度?将 wav 文件分解成更小的片段并发送给 watson,但它没有用。我的参考是 - (https://github.com/freelanceastro/interview-transcriber)
您是否多次尝试 运行 此脚本?您可以编写一个包装器在子进程中启动此脚本,如下所示:
import subprocess
import sys
processes = []
for _ in range(5):
processes.append(subprocess.Popen([sys.executable, "/path/to/script.py"]))
# now wait for them to finish
for process in processes:
process.wait()
您似乎想查找目录中的所有 .wav 文件并依次处理每个文件。
import os
for filename in os.listdir(os.getcwd()):
if filename.endswith('.wav'):
with open(filename, 'rb') as audio_file:
您甚至可以扩展它,使其保留 运行 并且只处理新文件。
您可以尝试将您的代码变成一个函数,扫描当前目录中所有扩展名为 .wav 的文件(使用 os
就像前面提到的一些或 glob
),并且为每个文件调用此函数。它会导致这样的结果:
####RUN THIS PART FIRST#########
import json
from os.path import join, dirname
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
import threading
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
import pandas as pd
import glob
authenticator = IAMAuthenticator('xxyyzz')
service = SpeechToTextV1(authenticator=authenticator)
service.set_service_url('https://api.us-east.speech-to-text.watson.cloud.ibm.com')
models = service.list_models().get_result()
#print(json.dumps(models, indent=2))
model = service.get_model('en-US_BroadbandModel').get_result()
#print(json.dumps(model, indent=2))
def transcribe(infile, service):
# This is the name of the file u need to change below
with open(infile,'rb') as audio_file:
# print(json.dumps(
output = service.recognize(
audio=audio_file,
speaker_labels=True,
content_type='audio/wav',
#timestamps=True,
#word_confidence=True,
inactivity_timeout = -1,
model='en-US_NarrowbandModel',
continuous=True).get_result(),
indent=2
############END################################
# get data to a csv
########################RUN THIS PART SECOND#####################################
df0 = pd.DataFrame([i for elts in output for alts in elts['results'] for i in alts['alternatives']])
df1 = pd.DataFrame([i for elts in output for i in elts['speaker_labels']])
list(df0.columns)
list(df1.columns)
df0 = df0.drop(["timestamps"], axis=1)
df1 = df1.drop(["final"], axis=1)
df1 = df1.drop(['confidence'],axis=1)
test3 = pd.concat([df0, df1], axis=1)
#sentiment
transcript = test3['transcript']
transcript = transcript.dropna()
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
text = transcript
scores = []
for txt in text:
vs = analyzer.polarity_scores(txt)
scores.append(vs)
data = pd.DataFrame(text, columns= ['Text'])
data2 = pd.DataFrame(scores)
final_dataset= pd.concat([data,data2], axis=1)
test4 = pd.concat([test3,final_dataset], axis=1)
test4 = test4.drop(['Text'],axis=1)
test4.rename(columns={'neg':'Negative'},
inplace=True)
test4.rename(columns={'pos':'Positive'},
inplace=True)
test4.rename(columns={'neu':'Neutral'},
inplace=True)
# This is the name of the output csv file
test4.to_csv(infile[:-4] + ".csv")
for i in glob.glob("*.wav"):
transcribe(i, service)
我想我可能会有一些东西:
import os
import json
import time
# import threading
from pathlib import Path
import concurrent.futures
# from os.path import join, dirname
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd
# Replace with your api key.
my_api_key = "abc123"
# You can add a directory path to Path() if you want to run
# the project from a different folder at some point.
directory = Path().absolute()
authenticator = IAMAuthenticator(my_api_key)
service = SpeechToTextV1(authenticator=authenticator)
service.set_service_url('https://api.us-east.speech-to-text.watson.cloud.ibm.com')
# I used this URL.
# service.set_service_url('https://stream.watsonplatform.net/speech-to-text/api')
models = service.list_models().get_result()
#print(json.dumps(models, indent=2))
model = service.get_model('en-US_BroadbandModel').get_result()
#print(json.dumps(model, indent=2))
# get data to a csv
########################RUN THIS PART SECOND#####################################
def process_data(json_data, output_path):
print(f"Processing: {output_path.stem}")
cols = ["transcript", "confidence"]
dfdata = [[t[cols[0]], t[cols[1]]] for r in json_data.get('results') for t in r.get("alternatives")]
df0 = pd.DataFrame(data = dfdata, columns = cols)
df1 = pd.DataFrame(json_data.get("speaker_labels")).drop(["final", "confidence"], axis=1)
# test3 = pd.concat([df0, df1], axis=1)
test3 = pd.merge(df0, df1, left_index = True, right_index = True)
# sentiment
print(f"Getting sentiment for: {output_path.stem}")
transcript = test3["transcript"]
transcript.dropna(inplace=True)
analyzer = SentimentIntensityAnalyzer()
text = transcript
scores = [analyzer.polarity_scores(txt) for txt in text]
# data = pd.DataFrame(text, columns = ["Text"])
data = transcript.to_frame(name="Text")
data2 = pd.DataFrame(scores)
# final_dataset= pd.concat([data, data2], axis=1)
final_dataset = pd.merge(data, data2, left_index = True, right_index = True)
# test4 = pd.concat([test3, final_dataset], axis=1)
test4 = pd.merge(test3, final_dataset, left_index = True, right_index = True)
test4.drop("Text", axis=1, inplace=True)
test4.rename(columns = {
"neg": "Negative",
"pos": "Positive",
"neu": "Neutral",
}, inplace=True)
# This is the name of the output csv file
test4.to_csv(output_path, index = False)
def process_audio_file(filename, output_type = "csv"):
audio_file_path = directory.joinpath(filename)
# Update output path to consider `output_type` parameter.
out_path = directory.joinpath(f"{audio_file_path.stem}.{output_type}")
print(f"Current file: '{filename}'")
with open(audio_file_path, "rb") as audio_file:
data = service.recognize(
audio = audio_file,
speaker_labels = True,
content_type = "audio/wav",
inactivity_timeout = -1,
model = "en-US_NarrowbandModel",
continuous = True,
).get_result()
print(f"Speech-to-text complete for: '{filename}'")
# Return data and output path as collection.
return [data, out_path]
def main():
print("Running main()...")
# Default num. workers == min(32, os.cpu_count() + 4)
n_workers = os.cpu_count() + 2
# Create generator for all .wav files in folder (and subfolders).
file_gen = directory.glob("**/*.wav")
with concurrent.futures.ThreadPoolExecutor(max_workers = n_workers) as executor:
futures = {executor.submit(process_audio_file, f) for f in file_gen}
for future in concurrent.futures.as_completed(futures):
pkg = future.result()
process_data(*pkg)
if __name__ == "__main__":
print(f"Program to process audio files has started.")
t_start = time.perf_counter()
main()
t_stop = time.perf_counter()
print(f"Done! Processing completed in {t_stop - t_start} seconds.")
我正在尝试使用我的 python 应用程序转录一个文件夹中的多个文件并加快该过程。 目前我可以一次处理一个文件 -
####RUN THIS PART FIRST#########
import json
from os.path import join, dirname
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
import threading
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
import pandas as pd
authenticator = IAMAuthenticator('xxyyzz')
service = SpeechToTextV1(authenticator=authenticator)
service.set_service_url('https://api.us-east.speech-to-text.watson.cloud.ibm.com')
models = service.list_models().get_result()
#print(json.dumps(models, indent=2))
model = service.get_model('en-US_BroadbandModel').get_result()
#print(json.dumps(model, indent=2))
# This is the name of the file u need to change below
with open(join(dirname('__file__'), 'Call 8.wav'),
'rb') as audio_file:
# print(json.dumps(
output = service.recognize(
audio=audio_file,
speaker_labels=True,
content_type='audio/wav',
#timestamps=True,
#word_confidence=True,
inactivity_timeout = -1,
model='en-US_NarrowbandModel',
continuous=True).get_result(),
indent=2
############END################################
# get data to a csv
########################RUN THIS PART SECOND#####################################
df0 = pd.DataFrame([i for elts in output for alts in elts['results'] for i in alts['alternatives']])
df1 = pd.DataFrame([i for elts in output for i in elts['speaker_labels']])
list(df0.columns)
list(df1.columns)
df0 = df0.drop(["timestamps"], axis=1)
df1 = df1.drop(["final"], axis=1)
df1 = df1.drop(['confidence'],axis=1)
test3 = pd.concat([df0, df1], axis=1)
#sentiment
transcript = test3['transcript']
transcript = transcript.dropna()
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
text = transcript
scores = []
for txt in text:
vs = analyzer.polarity_scores(txt)
scores.append(vs)
data = pd.DataFrame(text, columns= ['Text'])
data2 = pd.DataFrame(scores)
final_dataset= pd.concat([data,data2], axis=1)
test4 = pd.concat([test3,final_dataset], axis=1)
test4 = test4.drop(['Text'],axis=1)
test4.rename(columns={'neg':'Negative'},
inplace=True)
test4.rename(columns={'pos':'Positive'},
inplace=True)
test4.rename(columns={'neu':'Neutral'},
inplace=True)
# This is the name of the output csv file
test4.to_csv("Call 8.csv")
我如何做到这一点来转录一个文件夹中的多个文件而不是一次转录一个文件?我可以多次 运行 这个脚本,但我想自动化它以便它从中获取 wav 文件一个文件夹并 运行s 它。假设我的文件夹 C:\Python 中有 15 个音频 wav 文件。我想让它成为一个自动化过程,它将 运行 脚本并获得 15 个 csvs。每个人 1 个输出。现在这个脚本可以工作,但必须手动 运行 它为每个 wav 文件获取每个 wavs 输出 csv。
另外,作为第二个问题(抱歉!),有没有办法加快转录速度?将 wav 文件分解成更小的片段并发送给 watson,但它没有用。我的参考是 - (https://github.com/freelanceastro/interview-transcriber)
您是否多次尝试 运行 此脚本?您可以编写一个包装器在子进程中启动此脚本,如下所示:
import subprocess
import sys
processes = []
for _ in range(5):
processes.append(subprocess.Popen([sys.executable, "/path/to/script.py"]))
# now wait for them to finish
for process in processes:
process.wait()
您似乎想查找目录中的所有 .wav 文件并依次处理每个文件。
import os
for filename in os.listdir(os.getcwd()):
if filename.endswith('.wav'):
with open(filename, 'rb') as audio_file:
您甚至可以扩展它,使其保留 运行 并且只处理新文件。
您可以尝试将您的代码变成一个函数,扫描当前目录中所有扩展名为 .wav 的文件(使用 os
就像前面提到的一些或 glob
),并且为每个文件调用此函数。它会导致这样的结果:
####RUN THIS PART FIRST#########
import json
from os.path import join, dirname
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
import threading
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
import pandas as pd
import glob
authenticator = IAMAuthenticator('xxyyzz')
service = SpeechToTextV1(authenticator=authenticator)
service.set_service_url('https://api.us-east.speech-to-text.watson.cloud.ibm.com')
models = service.list_models().get_result()
#print(json.dumps(models, indent=2))
model = service.get_model('en-US_BroadbandModel').get_result()
#print(json.dumps(model, indent=2))
def transcribe(infile, service):
# This is the name of the file u need to change below
with open(infile,'rb') as audio_file:
# print(json.dumps(
output = service.recognize(
audio=audio_file,
speaker_labels=True,
content_type='audio/wav',
#timestamps=True,
#word_confidence=True,
inactivity_timeout = -1,
model='en-US_NarrowbandModel',
continuous=True).get_result(),
indent=2
############END################################
# get data to a csv
########################RUN THIS PART SECOND#####################################
df0 = pd.DataFrame([i for elts in output for alts in elts['results'] for i in alts['alternatives']])
df1 = pd.DataFrame([i for elts in output for i in elts['speaker_labels']])
list(df0.columns)
list(df1.columns)
df0 = df0.drop(["timestamps"], axis=1)
df1 = df1.drop(["final"], axis=1)
df1 = df1.drop(['confidence'],axis=1)
test3 = pd.concat([df0, df1], axis=1)
#sentiment
transcript = test3['transcript']
transcript = transcript.dropna()
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
analyzer = SentimentIntensityAnalyzer()
text = transcript
scores = []
for txt in text:
vs = analyzer.polarity_scores(txt)
scores.append(vs)
data = pd.DataFrame(text, columns= ['Text'])
data2 = pd.DataFrame(scores)
final_dataset= pd.concat([data,data2], axis=1)
test4 = pd.concat([test3,final_dataset], axis=1)
test4 = test4.drop(['Text'],axis=1)
test4.rename(columns={'neg':'Negative'},
inplace=True)
test4.rename(columns={'pos':'Positive'},
inplace=True)
test4.rename(columns={'neu':'Neutral'},
inplace=True)
# This is the name of the output csv file
test4.to_csv(infile[:-4] + ".csv")
for i in glob.glob("*.wav"):
transcribe(i, service)
我想我可能会有一些东西:
import os
import json
import time
# import threading
from pathlib import Path
import concurrent.futures
# from os.path import join, dirname
from ibm_watson import SpeechToTextV1
from ibm_watson.websocket import RecognizeCallback, AudioSource
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd
# Replace with your api key.
my_api_key = "abc123"
# You can add a directory path to Path() if you want to run
# the project from a different folder at some point.
directory = Path().absolute()
authenticator = IAMAuthenticator(my_api_key)
service = SpeechToTextV1(authenticator=authenticator)
service.set_service_url('https://api.us-east.speech-to-text.watson.cloud.ibm.com')
# I used this URL.
# service.set_service_url('https://stream.watsonplatform.net/speech-to-text/api')
models = service.list_models().get_result()
#print(json.dumps(models, indent=2))
model = service.get_model('en-US_BroadbandModel').get_result()
#print(json.dumps(model, indent=2))
# get data to a csv
########################RUN THIS PART SECOND#####################################
def process_data(json_data, output_path):
print(f"Processing: {output_path.stem}")
cols = ["transcript", "confidence"]
dfdata = [[t[cols[0]], t[cols[1]]] for r in json_data.get('results') for t in r.get("alternatives")]
df0 = pd.DataFrame(data = dfdata, columns = cols)
df1 = pd.DataFrame(json_data.get("speaker_labels")).drop(["final", "confidence"], axis=1)
# test3 = pd.concat([df0, df1], axis=1)
test3 = pd.merge(df0, df1, left_index = True, right_index = True)
# sentiment
print(f"Getting sentiment for: {output_path.stem}")
transcript = test3["transcript"]
transcript.dropna(inplace=True)
analyzer = SentimentIntensityAnalyzer()
text = transcript
scores = [analyzer.polarity_scores(txt) for txt in text]
# data = pd.DataFrame(text, columns = ["Text"])
data = transcript.to_frame(name="Text")
data2 = pd.DataFrame(scores)
# final_dataset= pd.concat([data, data2], axis=1)
final_dataset = pd.merge(data, data2, left_index = True, right_index = True)
# test4 = pd.concat([test3, final_dataset], axis=1)
test4 = pd.merge(test3, final_dataset, left_index = True, right_index = True)
test4.drop("Text", axis=1, inplace=True)
test4.rename(columns = {
"neg": "Negative",
"pos": "Positive",
"neu": "Neutral",
}, inplace=True)
# This is the name of the output csv file
test4.to_csv(output_path, index = False)
def process_audio_file(filename, output_type = "csv"):
audio_file_path = directory.joinpath(filename)
# Update output path to consider `output_type` parameter.
out_path = directory.joinpath(f"{audio_file_path.stem}.{output_type}")
print(f"Current file: '{filename}'")
with open(audio_file_path, "rb") as audio_file:
data = service.recognize(
audio = audio_file,
speaker_labels = True,
content_type = "audio/wav",
inactivity_timeout = -1,
model = "en-US_NarrowbandModel",
continuous = True,
).get_result()
print(f"Speech-to-text complete for: '{filename}'")
# Return data and output path as collection.
return [data, out_path]
def main():
print("Running main()...")
# Default num. workers == min(32, os.cpu_count() + 4)
n_workers = os.cpu_count() + 2
# Create generator for all .wav files in folder (and subfolders).
file_gen = directory.glob("**/*.wav")
with concurrent.futures.ThreadPoolExecutor(max_workers = n_workers) as executor:
futures = {executor.submit(process_audio_file, f) for f in file_gen}
for future in concurrent.futures.as_completed(futures):
pkg = future.result()
process_data(*pkg)
if __name__ == "__main__":
print(f"Program to process audio files has started.")
t_start = time.perf_counter()
main()
t_stop = time.perf_counter()
print(f"Done! Processing completed in {t_stop - t_start} seconds.")