Python: 如何将 mp3 块解码为 PCM 样本?

Python: How to decode a mp3 chunk into PCM samples?

我正在尝试捕捉 mp3 网络流的块并将它们解码为 PCM 样本以进行信号处理。我试图通过请求和 io.BytesIO 捕获音频以将数据保存为 .wav 文件。

我必须将 mp3 数据转换为 wav 数据,但我不知道如何。 (我的目标不是录制.wav文件,我只是这样做来测试算法。)

我找到了 pymedia 库,但它很旧(最后一次提交是在 2006 年),使用 python 2.7,对我来说无法安装。

也许可以使用 ffmpeg-python,但我刚刚看到使用文件作为输入和输出的示例。

这是我的代码:

import requests
import io
import soundfile as sf
import struct
import wave
import numpy as np


def main():
    stream_url = r'http://dg-wdr-http-dus-dtag-cdn.cast.addradio.de/wdr/1live/diggi/mp3/128/stream.mp3'
    r = requests.get(stream_url, stream=True)
    sample_array = []
    try:
        for block in r.iter_content(1024):
            data, samplerate = sf.read(io.BytesIO(block), format="RAW", channels=2, samplerate=44100, subtype='FLOAT',
                                       dtype='float32')
            sample_array = np.append(sample_array, data)

    except KeyboardInterrupt:
        print("...saving")
        obj = wave.open('sounds/stream1.wav', 'w')
        obj.setnchannels(1)  # mono
        obj.setsampwidth(2)  # bytes
        obj.setframerate(44100)

        data_max = np.nanmax(abs(sample_array))

        # fill WAV with samples from sample_array
        for sample in sample_array:
            if (np.isnan(sample) or np.isnan(32760 * sample / data_max)) is True:
                continue
            try:
                value = int(32760 * sample / data_max)  # normalization INT16
            except ValueError:
                value = 1
            finally:
                data = struct.pack('<h', value)
                obj.writeframesraw(data)

        obj.close()
        print("end")


if __name__ == '__main__':
    main()

你知道如何处理这个问题吗?

您缺少 mp3 流的解码。您只是将 mp3 文件保存为 wav。

您首先需要解码 mp3 音频。这将为您提供 PCM 样本 + 音频信息。

Irmen 和他的 "miniaudio" 和 "synthesizer" 库的帮助下,我可以解决问题。

问题是,大多数广播网络流都使用 ICECAST 协议,其中包含交错的元数据信息,因此您无法直接对其进行解码。

使用示例脚本 https://github.com/irmen/synthesizer/blob/master/examples/internetradio.py 作为模板,我可以编写一个脚本,记录网络流直到 KeyboardInterrupt 并将其保存为 .wav 文件。

这是我编辑的主要部分:

 ...

 def _audio_playback(self, pcm_stream):
    sample_array = None

    with Output(mixing="sequential", frames_per_chunk=44100 // 4) as output:
        print("begin recording")
        while self.decode_flag:
            try:
                audio = pcm_stream.read(44100 * 2 * 2 // 20)
                if not audio:
                    break
            except (IOError, ValueError):
                break
            else:
                sample = Sample.from_raw_frames(audio, 2, 44100, 2)
                if sample_array is None:
                    sample_array = sample.get_frames_numpy_float()
                else:
                    sample_array = np.append(sample_array, sample.get_frames_numpy_float(), axis=0)
        print("...saving")
        wavf.write(self.file_location, 44100, sample_array)
        print("saved")

...

基于 Bendzko 的回答,这里是我的代码:

pip install pyaudio miniaudio

import threading

import urllib.request

import time
try:
    import miniaudio
except ImportError:
    miniaudio = None
    
import pyaudio

import ctypes

import sys 

CHUNK = 4096

p = pyaudio.PyAudio()

stream = p.open(format=pyaudio.paInt16,channels=2,rate=44100,output=True)

    



class RadioThread(threading.Thread):
    def run(self):
        self.url = "https://impradio.bytemasters.gr/8002/stream"
        #run in threading
        client = miniaudio.IceCastClient(self.url)
        pcm_stream = MiniaudioDecoderPcmStream(client.audio_format,client)
        self.audio_playback(pcm_stream)


    def audio_playback(self,pcm_stream):
        global stop_peradio_thread
        while stop_peradio_thread==False:
            try:
                audio = pcm_stream.read(CHUNK)
                stream.write(audio.tobytes())
            except:
                pass
                
                
class MiniaudioDecoderPcmStream(miniaudio.StreamableSource):
    def __init__(self, fmt, stream):
        self.pcm_stream = miniaudio.stream_any(stream, fmt, dither=miniaudio.DitherMode.TRIANGLE)

    def read(self, size):
        try:
            return self.pcm_stream.send(size)
        except StopIteration:
            return b""

def main():
    global stop_peradio_thread
    stop_peradio_thread = False
    t1 = RadioThread()
    t1.start()
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            stop_peradio_thread = True
            t1.join()
            sys.exit()
            
main()