包装一个 io.BufferedIOBase 使其成为可搜索的

Wrap an io.BufferedIOBase such that it becomes seek-able

我试图对有关来自 HTTP 服务器的流式音频的问题做出回应,然后用 PyGame. I had the code mostly complete, but hit an error where the PyGame music functions 尝试 seek() 在 urllib.HTTPResponse 对象上播放它。

根据 urlib 文档,urllib.HTTPResponse 对象(自 v3.5 起)是一个 io.BufferedIOBase。我预计这会使流 seek()able,但它不会。

有没有办法包装 io.BufferedIOBase 以使其足够智能以缓冲足够的数据来处理查找操作?

import pygame
import urllib.request
import io

# Window size
WINDOW_WIDTH  = 400
WINDOW_HEIGHT = 400
# background colour
SKY_BLUE      = (161, 255, 254)

### Begin the streaming of a file
### Return the urlib.HTTPResponse, a file-like-object
def openURL( url ):
    result = None

    try:
        http_response = urllib.request.urlopen( url )
        print( "streamHTTP() - Fetching URL [%s]" % ( http_response.geturl() ) )
        print( "streamHTTP() - Response Status [%d] / [%s]" % ( http_response.status, http_response.reason ) )
        result = http_response
    except:
        print( "streamHTTP() - Error Fetching URL [%s]" % ( url ) )

    return result


### MAIN
pygame.init()
window  = pygame.display.set_mode( ( WINDOW_WIDTH, WINDOW_HEIGHT ) )
pygame.display.set_caption("Music Streamer")


clock = pygame.time.Clock()
done = False
while not done:

    # Handle user-input
    for event in pygame.event.get():
        if ( event.type == pygame.QUIT ):
            done = True
    # Keys
    keys = pygame.key.get_pressed()
    if ( keys[pygame.K_UP] ):
        if ( pygame.mixer.music.get_busy() ):
            print("busy")
        else:
            print("play")
            remote_music = openURL( 'http://127.0.0.1/example.wav' )
            if ( remote_music != None and remote_music.status == 200 ):
                pygame.mixer.music.load( io.BufferedReader( remote_music ) )
                pygame.mixer.music.play()

    # Re-draw the screen
    window.fill( SKY_BLUE )

    # Update the window, but not more than 60fps
    pygame.display.flip()
    clock.tick_busy_loop( 60 )

pygame.quit()

当此代码运行并推送 Up 时,它失败并显示错误:

streamHTTP() - Fetching URL [http://127.0.0.1/example.wav]
streamHTTP() - Response Status [200] / [OK]
io.UnsupportedOperation: seek
io.UnsupportedOperation: File or stream is not seekable.
io.UnsupportedOperation: seek
io.UnsupportedOperation: File or stream is not seekable.
Traceback (most recent call last):
  File "./sound_stream.py", line 57, in <module>
    pygame.mixer.music.load( io.BufferedReader( remote_music ) )
pygame.error: Unknown WAVE format

我还尝试重新打开 io 流,以及对同一类事物的各种其他重新实现。

如果您可以使用 requests 模块(支持流式传输)而不是 urllib,您可以使用包装器 like this:

class ResponseStream(object):
    def __init__(self, request_iterator):
        self._bytes = BytesIO()
        self._iterator = request_iterator

    def _load_all(self):
        self._bytes.seek(0, SEEK_END)
        for chunk in self._iterator:
            self._bytes.write(chunk)

    def _load_until(self, goal_position):
        current_position = self._bytes.seek(0, SEEK_END)
        while current_position < goal_position:
            try:
                current_position = self._bytes.write(next(self._iterator))
            except StopIteration:
                break

    def tell(self):
        return self._bytes.tell()

    def read(self, size=None):
        left_off_at = self._bytes.tell()
        if size is None:
            self._load_all()
        else:
            goal_position = left_off_at + size
            self._load_until(goal_position)

        self._bytes.seek(left_off_at)
        return self._bytes.read(size)

    def seek(self, position, whence=SEEK_SET):
        if whence == SEEK_END:
            self._load_all()
        else:
            self._bytes.seek(position, whence)

那我想你可以这样做:

WINDOW_WIDTH  = 400
WINDOW_HEIGHT = 400
SKY_BLUE      = (161, 255, 254)
URL           = 'http://localhost:8000/example.wav'

pygame.init()
window  = pygame.display.set_mode( ( WINDOW_WIDTH, WINDOW_HEIGHT ) )
pygame.display.set_caption("Music Streamer")
clock = pygame.time.Clock()
done = False
font = pygame.font.SysFont(None, 32)
state = 0

def play_music():
    response = requests.get(URL, stream=True)
    if (response.status_code == 200):
        stream = ResponseStream(response.iter_content(64))
        pygame.mixer.music.load(stream)
        pygame.mixer.music.play()
    else:
        state = 0

while not done:

    for event in pygame.event.get():
        if ( event.type == pygame.QUIT ):
            done = True

        if event.type == pygame.KEYDOWN and state == 0:
            Thread(target=play_music).start()
            state = 1

    window.fill( SKY_BLUE )
    window.blit(font.render(str(pygame.time.get_ticks()), True, (0,0,0)), (32, 32))
    pygame.display.flip()
    clock.tick_busy_loop( 60 )

pygame.quit()

使用 Thread 开始流式传输。

我不确定这是否 100% 有效,但请尝试一下。

求求

According to the urlib docs, the urllib.HTTPResponse object (since v3.5) is an io.BufferedIOBase. I expected this would make the stream seek()able, however it does not.

没错。 io.BufferedIOBase interface doesn't guarantee the I/O object is seekable. For HTTPResponse objects, IOBase.seekable() returns False:

>>> import urllib.request
>>> response = urllib.request.urlopen("http://httpbin.org/get")
>>> response
<http.client.HTTPResponse object at 0x110870ca0>
>>> response.seekable()
False

那是因为 HTTPResponse 提供的 BufferedIOBase 实现包装了一个套接字对象,而 sockets are not seekable either.

您不能将 BufferedIOBase 对象包装在 BufferedReader 对象中并添加寻求支持。 Buffered* 包装器对象只能包装 RawIOBase 类型,它们依赖于包装对象提供寻求支持。您必须模拟原始 I/O 级别的搜索,见下文。

您仍然可以在更高级别提供相同的功能,但要考虑到 搜索远程数据 涉及更多;这不是一个简单的 更改一个简单的 OS 代表磁盘上文件位置的变量 操作。对于较大的远程文件数据,在不在本地磁盘上备份整个文件的情况下进行查找可能与使用 HTTP range requests 和本地(在内存或磁盘上)缓冲区来平衡声音播放性能和最小化本地数据存储一样复杂。为广泛的用例正确执行此操作可能需要付出很多努力,因此肯定不是 Python 标准库的一部分。

如果您的声音文件很小

如果您的 HTTP 来源的声音文件足够小(最多几 MB),那么只需将整个响应读入内存中的 io.BytesIO() 文件对象。我真的认为不值得让这个比那个更复杂,因为 当你有足够的数据来使它值得追求时你的文件大到足以占用太多内存!

因此,如果您的声音文件较小(不超过几 MB),这将 绰绰有余

from io import BytesIO
import urllib.error
import urllib.request

def open_url(url):
    try:
        http_response = urllib.request.urlopen(url)
        print(f"streamHTTP() - Fetching URL [{http_response.geturl()}]")
        print(f"streamHTTP() - Response Status [{http_response.status}] / [{http_response.reason}]")
    except urllib.error.URLError:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    if http_response.status != 200:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    return BytesIO(http_response.read())

这不需要编写包装器对象,并且因为 BytesIO 是本机实现,一旦数据被完全复制,访问数据的速度比任何 Python 代码都快wrapper 可以给你。

请注意,此 returns 是一个 BytesIO 文件对象,因此您不再需要测试响应状态:

remote_music = open_url('http://127.0.0.1/example.wav')
if remote_music is not None:
    pygame.mixer.music.load(remote_music)
    pygame.mixer.music.play()

如果超过几MB

一旦超过几兆字节,您可以尝试预加载数据到本地文件对象中。您可以通过使用线程让 shutil.copyfileobj() 在后台将大部分数据复制到该文件中并在仅加载初始数据量后将文件提供给 PyGame 来使其更加复杂。

通过使用 实际文件对象 ,您实际上可以在此处帮助提高性能,因为 PyGame 将尽量减少 SDL 混合器和文件数据之间的插入。如果磁盘上有一个带有文件号的实际文件(流的 OS 级标识符,SDL 混合器库可以使用的东西),那么 PyGame 将直接对该文件进行操作,并且因此,请尽量减少阻塞 GIL(这反过来会帮助游戏的 Python 部分表现得更好!)。如果您传入一个文件名(只是一个字符串),那么 PyGame 就会完全避开并将所有文件操作留给 SDL 库。

这是这样一个实现;这应该在正常 Python 解释器退出时自动清理下载的文件。它 returns 一个供 PyGame 处理的文件名,在缓冲了最初的几 KB 之后,最终下载数据是在一个线程中完成的。它将避免多次加载相同的 URL,并且我已使其成为线程安全的:

import shutil
import urllib.error
import urllib.request
from tempfile import NamedTemporaryFile
from threading import Lock, Thread

INITIAL_BUFFER = 1024 * 8  # 8kb initial file read to start URL-backed files
_url_files_lock = Lock()
# stores open NamedTemporaryFile objects, keeping them 'alive'
# removing entries from here causes the file data to be deleted.
_url_files = {}


def open_url(url):
    with _url_files_lock:
        if url in _url_files:
            return _url_files[url].name

    try:
        http_response = urllib.request.urlopen(url)
        print(f"streamHTTP() - Fetching URL [{http_response.geturl()}]")
        print(f"streamHTTP() - Response Status [{http_response.status}] / [{http_response.reason}]")
    except urllib.error.URLError:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    if http_response.status != 200:
        print("streamHTTP() - Error Fetching URL [{url}]")
        return

    fileobj = NamedTemporaryFile()

    content_length = http_response.getheader("Content-Length")
    if content_length is not None:
        try:
            content_length = int(content_length)
        except ValueError:
            content_length = None
        if content_length:
            # create sparse file of full length
            fileobj.seek(content_length - 1)
            fileobj.write(b"[=13=]")
            fileobj.seek(0)

    fileobj.write(http_response.read(INITIAL_BUFFER))
    with _url_files_lock:
        if url in _url_files:
            # another thread raced us to this point, we lost, return their
            # result after cleaning up here
            fileobj.close()
            http_response.close()
            return _url_files[url].name

        # store the file object for this URL; this keeps the file
        # open and so readable if you have the filename.
        _url_files[url] = fileobj

    def copy_response_remainder():
        # copies file data from response to disk, for all data past INITIAL_BUFFER
        with http_response:
            shutil.copyfileobj(http_response, fileobj)

    t = Thread(daemon=True, target=copy_response_remainder)
    t.start()

    return fileobj.name

BytesIO() 解决方案一样,上面的 returns 或者 None 或准备传递给 pygame.mixer.music.load() 的值。

如果您尝试立即在声音文件中设置高级播放位置,上述方法可能 无效 ,因为稍后的数据可能尚未复制到文件中。这是一个权衡。

寻找第三方库

如果您需要在远程 URL 上获得完整的寻求支持并且不想为他们使用磁盘 space 并且不想担心他们的大小,您不需要在这里重新发明 HTTP-as-seekable-file 轮子。您可以使用提供相同功能的现有项目。我发现两个提供基于 io.BufferedIOBase 的实现:

两者都使用 HTTP Range 请求来实现寻求支持。只需使用 httpio.open(URL)smart_open.open(URL) 并将其直接传递给 pygame.mixer.music.load();如果 URL 无法打开,您可以通过处理 IOError 异常来捕获它:

from smart_open import open as url_open  # or from httpio import open

try:
    remote_music = url_open('http://127.0.0.1/example.wav')
except IOError:
    pass
else:
    pygame.mixer.music.load(remote_music)
    pygame.mixer.music.play()

smart_open 使用内存缓冲区来满足固定大小的读取,但会为每次调用创建新的 HTTP Range 请求以更改当前文件位置,因此性能可能会有所不同。由于 SDL 混音器对音频文件执行一些搜索以确定它们的类型,我预计这会慢一点。

httpio 可以缓冲数据块,因此可以更好地处理寻道,但是从源代码的简要浏览来看,当实际设置缓冲区大小时,缓存的块永远不会再次从内存中逐出,所以你' d 最终将整个文件保存在内存中。

实现自我寻找,通过io.RawIOBase

最后,因为我无法找到 高效 HTTP-Range-backed I/O 实现,所以我编写了自己的实现。以下实现了 io.RawIOBase interface, specifically so you can then wrap the object in a io.BufferedIOReader(),因此将缓存委托给缓存缓冲区,在查找时将正确管理该缓冲区:

import io
from copy import deepcopy
from functools import wraps
from typing import cast, overload, Callable, Optional, Tuple, TypeVar, Union
from urllib.request import urlopen, Request

T = TypeVar("T")

@overload
def _check_closed(_f: T) -> T: ...
@overload
def _check_closed(*, connect: bool, default: Union[bytes, int]) -> Callable[[T], T]: ...

def _check_closed(
    _f: Optional[T] = None,
    *,
    connect: bool = False,
    default: Optional[Union[bytes, int]] = None,
) -> Union[T, Callable[[T], T]]:
    def decorator(f: T) -> T:
        @wraps(cast(Callable, f))
        def wrapper(self, *args, **kwargs):
            if self.closed:
                raise ValueError("I/O operation on closed file.")
            if connect and self._fp is None or self._fp.closed:
                self._connect()
                if self._fp is None:
                    # outside the seekable range, exit early
                    return default
            try:
                return f(self, *args, **kwargs)
            except Exception:
                self.close()
                raise
            finally:
                if self._range_end and self._pos >= self._range_end:
                    self._fp.close()
                    del self._fp

        return cast(T, wrapper)

    if _f is not None:
        return decorator(_f)

    return decorator

def _parse_content_range(
    content_range: str
) -> Tuple[Optional[int], Optional[int], Optional[int]]:
    """Parse a Content-Range header into a (start, end, length) tuple"""
    units, *range_spec = content_range.split(None, 1)
    if units != "bytes" or not range_spec:
        return (None, None, None)
    start_end, _, size = range_spec[0].partition("/")
    try:
        length: Optional[int] = int(size)
    except ValueError:
        length = None
    start_val, has_start_end, end_val = start_end.partition("-")
    start = end = None
    if has_start_end:
        try:
            start, end = int(start_val), int(end_val)
        except ValueError:
            pass
    return (start, end, length)

class HTTPRawIO(io.RawIOBase):
    """Wrap a HTTP socket to handle seeking via HTTP Range"""

    url: str
    closed: bool = False
    _pos: int = 0
    _size: Optional[int] = None
    _range_end: Optional[int] = None
    _fp: Optional[io.RawIOBase] = None

    def __init__(self, url_or_request: Union[Request, str]) -> None:
        if isinstance(url_or_request, str):
            self._request = Request(url_or_request)
        else:
            # copy request objects to avoid sharing state
            self._request = deepcopy(url_or_request)
        self.url = self._request.full_url
        self._connect(initial=True)

    def readable(self) -> bool:
        return True

    def seekable(self) -> bool:
        return True

    def close(self) -> None:
        if self.closed:
            return
        if self._fp:
            self._fp.close()
            del self._fp
        self.closed = True

    @_check_closed
    def tell(self) -> int:
        return self._pos

    def _connect(self, initial: bool = False) -> None:
        if self._fp is not None:
            self._fp.close()
        if self._size is not None and self._pos >= self._size:
            # can't read past the end
            return
        request = self._request
        request.add_unredirected_header("Range", f"bytes={self._pos}-")
        response = urlopen(request)

        self.url = response.geturl()  # could have been redirected
        if response.status not in (200, 206):
            raise OSError(
                f"Failed to open {self.url}: "
                f"{response.status} ({response.reason})"
            )

        if initial:
            # verify that the server supports range requests. Capture the
            # content length if available
            if response.getheader("Accept-Ranges") != "bytes":
                raise OSError(
                    f"Resource doesn't support range requests: {self.url}"
                )
            try:
                length = int(response.getheader("Content-Length", ""))
                if length >= 0:
                    self._size = length
            except ValueError:
                pass

        # validate the range we are being served
        start, end, length = _parse_content_range(
            response.getheader("Content-Range", "")
        )
        if self._size is None:
            self._size = length
        if (start is not None and start != self._pos) or (
            length is not None and length != self._size
        ):
            # non-sensical range response
            raise OSError(
                f"Resource at {self.url} served invalid range: pos is "
                f"{self._pos}, range {start}-{end}/{length}"
            )
        if self._size and end is not None and end + 1 < self._size:
            # incomplete range, not reaching all the way to the end
            self._range_end = end
        else:
            self._range_end = None

        fp = cast(io.BufferedIOBase, response.fp)  # typeshed doesn't name fp
        self._fp = fp.detach()  # assume responsibility for the raw socket IO

    @_check_closed
    def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
        relative_to = {
            io.SEEK_SET: 0,
            io.SEEK_CUR: self._pos,
            io.SEEK_END: self._size,
        }.get(whence)
        if relative_to is None:
            if whence == io.SEEK_END:
                raise IOError(
                    f"Can't seek from end on unsized resource {self.url}"
                )
            raise ValueError(f"whence value {whence} unsupported")
        if -offset > relative_to:  # can't seek to a point before the start
            raise OSError(22, "Invalid argument")

        self._pos = relative_to + offset
        # there is no point in optimising an existing connection
        # by reading from it if seeking forward below some threshold.
        # Use a BufferedIOReader to avoid seeking by small amounts or by 0
        if self._fp:
            self._fp.close()
            del self._fp
        return self._pos

    # all read* methods delegate to the SocketIO object (itself a RawIO
    # implementation).

    @_check_closed(connect=True, default=b"")
    def read(self, size: int = -1) -> Optional[bytes]:
        assert self._fp is not None  # show type checkers we already checked
        res = self._fp.read(size)
        if res is not None:
            self._pos += len(res)
        return res

    @_check_closed(connect=True, default=b"")
    def readall(self) -> bytes:
        assert self._fp is not None  # show type checkers we already checked
        res = self._fp.readall()
        self._pos += len(res)
        return res

    @_check_closed(connect=True, default=0)
    def readinto(self, buffer: bytearray) -> Optional[int]:
        assert self._fp is not None  # show type checkers we already checked
        n = self._fp.readinto(buffer)
        self._pos += n or 0
        return n

请记住,这是一个 RawIOBase 对象,您确实希望将其包装在 BufferReader() 中。在 open_url() 中这样做看起来像这样:

def open_url(url, *args, **kwargs):
    return io.BufferedReader(HTTPRawIO(url), *args, **kwargs)

这为您提供了完全缓冲的 I/O,完全支持通过远程 URL 进行搜索,并且 BufferedReader 实现将在搜索时最大限度地减少 HTTP 连接的重置。我发现将它与 PyGame 混合器一起使用时,只会建立单个 HTTP 连接,因为所有测试搜索都在默认的 8KB 缓冲区内。