Python: 如何将 mp3 块解码为 PCM 样本?
Python: How to decode a mp3 chunk into PCM samples?
我正在尝试捕捉 mp3 网络流的块并将它们解码为 PCM 样本以进行信号处理。我试图通过请求和 io.BytesIO 捕获音频以将数据保存为 .wav 文件。
我必须将 mp3 数据转换为 wav 数据,但我不知道如何。 (我的目标不是录制.wav文件,我只是这样做来测试算法。)
我找到了 pymedia 库,但它很旧(最后一次提交是在 2006 年),使用 python 2.7,对我来说无法安装。
也许可以使用 ffmpeg-python,但我刚刚看到使用文件作为输入和输出的示例。
这是我的代码:
import requests
import io
import soundfile as sf
import struct
import wave
import numpy as np
def main():
stream_url = r'http://dg-wdr-http-dus-dtag-cdn.cast.addradio.de/wdr/1live/diggi/mp3/128/stream.mp3'
r = requests.get(stream_url, stream=True)
sample_array = []
try:
for block in r.iter_content(1024):
data, samplerate = sf.read(io.BytesIO(block), format="RAW", channels=2, samplerate=44100, subtype='FLOAT',
dtype='float32')
sample_array = np.append(sample_array, data)
except KeyboardInterrupt:
print("...saving")
obj = wave.open('sounds/stream1.wav', 'w')
obj.setnchannels(1) # mono
obj.setsampwidth(2) # bytes
obj.setframerate(44100)
data_max = np.nanmax(abs(sample_array))
# fill WAV with samples from sample_array
for sample in sample_array:
if (np.isnan(sample) or np.isnan(32760 * sample / data_max)) is True:
continue
try:
value = int(32760 * sample / data_max) # normalization INT16
except ValueError:
value = 1
finally:
data = struct.pack('<h', value)
obj.writeframesraw(data)
obj.close()
print("end")
if __name__ == '__main__':
main()
你知道如何处理这个问题吗?
您缺少 mp3 流的解码。您只是将 mp3 文件保存为 wav。
您首先需要解码 mp3 音频。这将为您提供 PCM 样本 + 音频信息。
在 Irmen 和他的 "miniaudio" 和 "synthesizer" 库的帮助下,我可以解决问题。
问题是,大多数广播网络流都使用 ICECAST 协议,其中包含交错的元数据信息,因此您无法直接对其进行解码。
使用示例脚本 https://github.com/irmen/synthesizer/blob/master/examples/internetradio.py 作为模板,我可以编写一个脚本,记录网络流直到 KeyboardInterrupt 并将其保存为 .wav 文件。
这是我编辑的主要部分:
...
def _audio_playback(self, pcm_stream):
sample_array = None
with Output(mixing="sequential", frames_per_chunk=44100 // 4) as output:
print("begin recording")
while self.decode_flag:
try:
audio = pcm_stream.read(44100 * 2 * 2 // 20)
if not audio:
break
except (IOError, ValueError):
break
else:
sample = Sample.from_raw_frames(audio, 2, 44100, 2)
if sample_array is None:
sample_array = sample.get_frames_numpy_float()
else:
sample_array = np.append(sample_array, sample.get_frames_numpy_float(), axis=0)
print("...saving")
wavf.write(self.file_location, 44100, sample_array)
print("saved")
...
基于 Bendzko 的回答,这里是我的代码:
pip install pyaudio miniaudio
import threading
import urllib.request
import time
try:
import miniaudio
except ImportError:
miniaudio = None
import pyaudio
import ctypes
import sys
CHUNK = 4096
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16,channels=2,rate=44100,output=True)
class RadioThread(threading.Thread):
def run(self):
self.url = "https://impradio.bytemasters.gr/8002/stream"
#run in threading
client = miniaudio.IceCastClient(self.url)
pcm_stream = MiniaudioDecoderPcmStream(client.audio_format,client)
self.audio_playback(pcm_stream)
def audio_playback(self,pcm_stream):
global stop_peradio_thread
while stop_peradio_thread==False:
try:
audio = pcm_stream.read(CHUNK)
stream.write(audio.tobytes())
except:
pass
class MiniaudioDecoderPcmStream(miniaudio.StreamableSource):
def __init__(self, fmt, stream):
self.pcm_stream = miniaudio.stream_any(stream, fmt, dither=miniaudio.DitherMode.TRIANGLE)
def read(self, size):
try:
return self.pcm_stream.send(size)
except StopIteration:
return b""
def main():
global stop_peradio_thread
stop_peradio_thread = False
t1 = RadioThread()
t1.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
stop_peradio_thread = True
t1.join()
sys.exit()
main()
我正在尝试捕捉 mp3 网络流的块并将它们解码为 PCM 样本以进行信号处理。我试图通过请求和 io.BytesIO 捕获音频以将数据保存为 .wav 文件。
我必须将 mp3 数据转换为 wav 数据,但我不知道如何。 (我的目标不是录制.wav文件,我只是这样做来测试算法。)
我找到了 pymedia 库,但它很旧(最后一次提交是在 2006 年),使用 python 2.7,对我来说无法安装。
也许可以使用 ffmpeg-python,但我刚刚看到使用文件作为输入和输出的示例。
这是我的代码:
import requests
import io
import soundfile as sf
import struct
import wave
import numpy as np
def main():
stream_url = r'http://dg-wdr-http-dus-dtag-cdn.cast.addradio.de/wdr/1live/diggi/mp3/128/stream.mp3'
r = requests.get(stream_url, stream=True)
sample_array = []
try:
for block in r.iter_content(1024):
data, samplerate = sf.read(io.BytesIO(block), format="RAW", channels=2, samplerate=44100, subtype='FLOAT',
dtype='float32')
sample_array = np.append(sample_array, data)
except KeyboardInterrupt:
print("...saving")
obj = wave.open('sounds/stream1.wav', 'w')
obj.setnchannels(1) # mono
obj.setsampwidth(2) # bytes
obj.setframerate(44100)
data_max = np.nanmax(abs(sample_array))
# fill WAV with samples from sample_array
for sample in sample_array:
if (np.isnan(sample) or np.isnan(32760 * sample / data_max)) is True:
continue
try:
value = int(32760 * sample / data_max) # normalization INT16
except ValueError:
value = 1
finally:
data = struct.pack('<h', value)
obj.writeframesraw(data)
obj.close()
print("end")
if __name__ == '__main__':
main()
你知道如何处理这个问题吗?
您缺少 mp3 流的解码。您只是将 mp3 文件保存为 wav。
您首先需要解码 mp3 音频。这将为您提供 PCM 样本 + 音频信息。
在 Irmen 和他的 "miniaudio" 和 "synthesizer" 库的帮助下,我可以解决问题。
问题是,大多数广播网络流都使用 ICECAST 协议,其中包含交错的元数据信息,因此您无法直接对其进行解码。
使用示例脚本 https://github.com/irmen/synthesizer/blob/master/examples/internetradio.py 作为模板,我可以编写一个脚本,记录网络流直到 KeyboardInterrupt 并将其保存为 .wav 文件。
这是我编辑的主要部分:
...
def _audio_playback(self, pcm_stream):
sample_array = None
with Output(mixing="sequential", frames_per_chunk=44100 // 4) as output:
print("begin recording")
while self.decode_flag:
try:
audio = pcm_stream.read(44100 * 2 * 2 // 20)
if not audio:
break
except (IOError, ValueError):
break
else:
sample = Sample.from_raw_frames(audio, 2, 44100, 2)
if sample_array is None:
sample_array = sample.get_frames_numpy_float()
else:
sample_array = np.append(sample_array, sample.get_frames_numpy_float(), axis=0)
print("...saving")
wavf.write(self.file_location, 44100, sample_array)
print("saved")
...
基于 Bendzko 的回答,这里是我的代码:
pip install pyaudio miniaudio
import threading
import urllib.request
import time
try:
import miniaudio
except ImportError:
miniaudio = None
import pyaudio
import ctypes
import sys
CHUNK = 4096
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16,channels=2,rate=44100,output=True)
class RadioThread(threading.Thread):
def run(self):
self.url = "https://impradio.bytemasters.gr/8002/stream"
#run in threading
client = miniaudio.IceCastClient(self.url)
pcm_stream = MiniaudioDecoderPcmStream(client.audio_format,client)
self.audio_playback(pcm_stream)
def audio_playback(self,pcm_stream):
global stop_peradio_thread
while stop_peradio_thread==False:
try:
audio = pcm_stream.read(CHUNK)
stream.write(audio.tobytes())
except:
pass
class MiniaudioDecoderPcmStream(miniaudio.StreamableSource):
def __init__(self, fmt, stream):
self.pcm_stream = miniaudio.stream_any(stream, fmt, dither=miniaudio.DitherMode.TRIANGLE)
def read(self, size):
try:
return self.pcm_stream.send(size)
except StopIteration:
return b""
def main():
global stop_peradio_thread
stop_peradio_thread = False
t1 = RadioThread()
t1.start()
while True:
try:
time.sleep(1)
except KeyboardInterrupt:
stop_peradio_thread = True
t1.join()
sys.exit()
main()