dask:从 Azure blob 中读取 parquet - AzureHttpError

dask: read parquet from Azure blob - AzureHttpError

我使用 dask.dataframe.to_parquet () 在 Azure blob 中创建了一个 parquet 文件。

我现在想阅读该文件。我在做:

STORAGE_OPTIONS={'account_name': 'ACCOUNT_NAME',
                 'account_key': 'ACCOUNT_KEY'}

df = dd.read_parquet('abfs://BLOB/FILE.parquet', storage_options=STORAGE_OPTIONS)

但我得到一个 AzureHttpError:

---------------------------------------------------------------------------
AzureHttpError                            Traceback (most recent call last)
<ipython-input-4-2184e772e417> in <module>
      3                  'account_key': 'ACCOUNT_KEY'}
      4 
----> 5 df = dd.read_parquet('abfs://BLOB/FILE', storage_options=STORAGE_OPTIONS)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, chunksize, **kwargs)
    231         filters=filters,
    232         split_row_groups=split_row_groups,
--> 233         **kwargs
    234     )
    235     if meta.index.name is not None:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py in read_metadata(fs, paths, categories, index, gather_statistics, filters, **kwargs)
    176         # correspond to a row group (populated below).
    177         parts, pf, gather_statistics, fast_metadata = _determine_pf_parts(
--> 178             fs, paths, gather_statistics, **kwargs
    179         )
    180 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\dask\dataframe\io\parquet\fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    127                 open_with=fs.open,
    128                 sep=fs.sep,
--> 129                 **kwargs.get("file", {})
    130             )
    131             if gather_statistics is None:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\fastparquet\api.py in __init__(self, fn, verify, open_with, root, sep)
    109                 fn2 = join_path(fn, '_metadata')
    110                 self.fn = fn2
--> 111                 with open_with(fn2, 'rb') as f:
    112                     self._parse_header(f, verify)
    113                 fn = fn2

~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in open(self, path, mode, block_size, cache_options, **kwargs)
    722                 autocommit=ac,
    723                 cache_options=cache_options,
--> 724                 **kwargs
    725             )
    726             if not ac:

~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in _open(self, path, mode, block_size, autocommit, cache_options, **kwargs)
    552             autocommit=autocommit,
    553             cache_options=cache_options,
--> 554             **kwargs,
    555         )
    556 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
    582             cache_type=cache_type,
    583             cache_options=cache_options,
--> 584             **kwargs,
    585         )
    586 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in __init__(self, fs, path, mode, block_size, autocommit, cache_type, cache_options, **kwargs)
    954         if mode == "rb":
    955             if not hasattr(self, "details"):
--> 956                 self.details = fs.info(path)
    957             self.size = self.details["size"]
    958             self.cache = caches[cache_type](

~\AppData\Local\Continuum\anaconda3\lib\site-packages\fsspec\spec.py in info(self, path, **kwargs)
    499         if out:
    500             return out[0]
--> 501         out = self.ls(path, detail=True, **kwargs)
    502         path = path.rstrip("/")
    503         out1 = [o for o in out if o["name"].rstrip("/") == path]

~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in ls(self, path, detail, invalidate_cache, delimiter, **kwargs)
    446             # then return the contents
    447             elif self._matches(
--> 448                 container_name, path, as_directory=True, delimiter=delimiter
    449             ):
    450                 logging.debug(f"{path} appears to be a directory")

~\AppData\Local\Continuum\anaconda3\lib\site-packages\adlfs\core.py in _matches(self, container_name, path, as_directory, delimiter)
    386             prefix=path,
    387             delimiter=delimiter,
--> 388             num_results=None,
    389         )
    390 

~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\blob\baseblobservice.py in list_blob_names(self, container_name, prefix, num_results, include, delimiter, marker, timeout)
   1360                   '_context': operation_context,
   1361                   '_converter': _convert_xml_to_blob_name_list}
-> 1362         resp = self._list_blobs(*args, **kwargs)
   1363 
   1364         return ListGenerator(resp, self._list_blobs, args, kwargs)

~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\blob\baseblobservice.py in _list_blobs(self, container_name, prefix, marker, max_results, include, delimiter, timeout, _context, _converter)
   1435         }
   1436 
-> 1437         return self._perform_request(request, _converter, operation_context=_context)
   1438 
   1439     def get_blob_account_information(self, container_name=None, blob_name=None, timeout=None):

~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
    444                                  status_code,
    445                                  exception_str_in_one_line)
--> 446                     raise ex
    447             finally:
    448                 # If this is a location locked operation and the location is not set,

~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
    372                 except AzureException as ex:
    373                     retry_context.exception = ex
--> 374                     raise ex
    375                 except Exception as ex:
    376                     retry_context.exception = ex

~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\storageclient.py in _perform_request(self, request, parser, parser_args, operation_context, expected_errors)
    358                         # and raised as an azure http exception
    359                         _http_error_handler(
--> 360                             HTTPError(response.status, response.message, response.headers, response.body))
    361 
    362                     # Parse the response

~\AppData\Local\Continuum\anaconda3\lib\site-packages\azure\storage\common\_error.py in _http_error_handler(http_error)
    113     ex.error_code = error_code
    114 
--> 115     raise ex
    116 
    117 

AzureHttpError: Server encountered an internal error. Please try again after some time. ErrorCode: InternalError
<?xml version="1.0" encoding="utf-8"?><Error><Code>InternalError</Code><Message>Server encountered an internal error. Please try again after some time.
RequestId:...
Time:2020-04-15T02:44:06.8611398Z</Message></Error>

错误文本表明服务暂时关闭。如果它仍然存在,您可能想在 adlfs 提出问题;也许它可以像他们最终更彻底的重试逻辑一样简单。