我可以从 HTTP(s) 八位字节流中读取镶木地板吗?

Can I read parquet from HTTP(s) octet-stream?

一些后端端点 returns 八位字节流中的镶木地板文件。

在 pandas 我可以做这样的事情:

result = requests.get("https://..../file.parquet")
df = pd.read_parquet(io.BytesIO(result.content))

我能以某种方式在 Dask 中完成吗?

此代码:

dd.read_parquet("https://..../file.parquet")

引发异常(很明显,因为这是类似字节的对象):

  File "to_parquet_dask.py", line 153, in <module>
    main(*parser.parse_args())
  File "to_parquet_dask.py", line 137, in main
    download_parquet(
  File "to_parquet_dask.py", line 121, in download_parquet
    dd.read_parquet(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
    read_metadata_result = engine.read_metadata(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
    parts, pf, gather_statistics, base_path = _determine_pf_parts(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 148, in _determine_pf_parts
    elif fs.isdir(paths[0]):
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 88, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 69, in sync
    raise result[0]
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/asyn.py", line 25, in _runner
    result[0] = await coro
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 418, in _isdir
    return bool(await self._ls(path))
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 195, in _ls
    out = await self._ls_real(url, detail=detail, **kwargs)
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fsspec/implementations/http.py", line 150, in _ls_real
    text = await r.text()
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1082, in text
    return self._body.decode(encoding, errors=errors)  # type: ignore
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x90 in position 7: invalid start byte

UPD

随着@mdurant 回答的 fsspec 的变化,我得到了错误

ValueError: Cannot seek streaming HTTP file

所以我将“simplecache::”放入我的 url 中,然后我将面对下一个:

Traceback (most recent call last):
  File "to_parquet_dask.py", line 161, in <module>
    main(*parser.parse_args())
  File "to_parquet_dask.py", line 145, in main
    download_parquet(
  File "to_parquet_dask.py", line 128, in download_parquet
    dd.read_parquet(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 313, in read_parquet
    read_metadata_result = engine.read_metadata(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 733, in read_metadata
    parts, pf, gather_statistics, base_path = _determine_pf_parts(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/dask/dataframe/io/parquet/fastparquet.py", line 185, in _determine_pf_parts
    pf = ParquetFile(
  File "/home/bc30138/Documents/CODE/flexydrive/driver_style/.venv/lib/python3.8/site-packages/fastparquet/api.py", line 127, in __init__
    raise ValueError("Opening directories without a _metadata requires"
ValueError: Opening directories without a _metadata requiresa filesystem compatible with fsspec

临时解决方法

也许这种方式很脏而且不是最优的,但有些工作:

@dask.delayed
def parquet_from_http(url, token):
    result = requests.get(
        url,
        headers={'Authorization': token}
    )
    return pd.read_parquet(io.BytesIO(result.content))
    
delayed_download = parquet_from_http(url, token)
df = dd.from_delayed(delayed_download, meta=meta)

p.s。这种方法中的 meta 参数是必要的,否则 dask 将使用此函数两次:找出 meta 而不是计算,因此将发出两个请求。

这不是答案,但我相信 fsspec 中的以下更改会解决您的问题。如果您愿意尝试确认,我们可以将其打补丁。

--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -472,7 +472,10 @@ class HTTPFileSystem(AsyncFileSystem):

     async def _isdir(self, path):
         # override, since all URLs are (also) files
-        return bool(await self._ls(path))
+        try:
+            return bool(await self._ls(path))
+        except (FileNotFoundError, ValueError):
+            return False

(我们可以将它放在一个分支中,如果这样可以让您更容易安装)

-编辑-

第二个问题(这在两个镶木地板引擎中都是一样的)源于服务器要么不提供文件的大小,要么不允许范围获取。 parquet 格式需要随机访问数据才能读取。解决这个问题的唯一方法(除了改进服务器)是在本地复制整个文件,例如,通过在 URL.

前面加上“simplecache::”