pandas read_csv storage_options 在本地工作但不在 Dataflow 中工作

pandas read_csv with storage_options working locally but not in Dataflow

我正在尝试将数据从 API 导入我的 GBQ 并想使用数据流。

由于我不知道和无法想象的原因,API 只是 returns 一个“.csv.gz” 的 URL,然后我需要下载并在将数据推入 GBQ 之前进行处理。

此外,API 具有不记名令牌身份验证,因此我正在寻找一种方法来处理数据的下载和解析以及身份验证,并找到了:

pd.read_csv('https://app.SOMEPROVIDER.com/api/reporting/download/SOMEID.csv.gz', storage_options={'Authorization': 'Bearer '+ bearer_token}, compression='gzip', header=0, sep=',', quotechar='"')

在我的本地 Beam 管道中使用它时效果非常好。

但是,一旦我将管道上传到数据流并运行它在那里,我就收到错误消息

ValueError: storage_options passed with file object or non-fsspec file path

完整跟踪:

"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
".\filename.py", line 144, in process File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
610, in read_csv return _read(filepath_or_buffer, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
819, in __init__ self._engine = self._make_engine(self.engine) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1050, in _make_engine return mapping[engine](self.f, **self.options) #
type: ignore[call-arg] File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1867, in __init__ self._open_handles(src, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1362, in _open_handles self.handles = get_handle( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
558, in get_handle ioargs = _get_filepath_or_buffer( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
286, in _get_filepath_or_buffer raise ValueError( ValueError:
storage_options passed with file object or non-fsspec file path During
handling of the above exception, another exception occurred: Traceback
(most recent call last): File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py",
line 651, in do_work work_executor.execute() File
"/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py",
line 179, in execute op.start() File
"dataflow_worker/shuffle_operations.py", line 63, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 64, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 79, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 80, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "dataflow_worker/shuffle_operations.py", line 84, in
dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
File "apache_beam/runners/worker/operations.py", line 353, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "dataflow_worker/shuffle_operations.py", line 261, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "dataflow_worker/shuffle_operations.py", line 268, in
dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
File "apache_beam/runners/worker/operations.py", line 353, in
apache_beam.runners.worker.operations.Operation.output File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 713, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1225, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1290, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 752, in
apache_beam.runners.common.PerWindowInvoker.invoke_process File
"apache_beam/runners/common.py", line 875, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1386, in
apache_beam.runners.common._OutputProcessor.process_outputs File
"apache_beam/runners/worker/operations.py", line 215, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 712, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/worker/operations.py", line 713, in
apache_beam.runners.worker.operations.DoOperation.process File
"apache_beam/runners/common.py", line 1225, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 1306, in
apache_beam.runners.common.DoFnRunner._reraise_augmented File
"apache_beam/runners/common.py", line 1223, in
apache_beam.runners.common.DoFnRunner.process File
"apache_beam/runners/common.py", line 572, in
apache_beam.runners.common.SimpleInvoker.invoke_process File
".\filename.py", line 144, in process File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
610, in read_csv return _read(filepath_or_buffer, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
462, in _read parser = TextFileReader(filepath_or_buffer, **kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
819, in __init__ self._engine = self._make_engine(self.engine) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1050, in _make_engine return mapping[engine](self.f, **self.options) #
type: ignore[call-arg] File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1867, in __init__ self._open_handles(src, kwds) File
"/usr/local/lib/python3.8/site-packages/pandas/io/parsers.py", line
1362, in _open_handles self.handles = get_handle( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
558, in get_handle ioargs = _get_filepath_or_buffer( File
"/usr/local/lib/python3.8/site-packages/pandas/io/common.py", line
286, in _get_filepath_or_buffer raise ValueError( ValueError:
storage_options passed with file object or non-fsspec file path [while
running 'Fetch actual report data'] ```

有谁知道为什么它在本地有效但在云中无效?我认为它可能与文件系统和临时文件有关 - 但错误消息没有多大意义......

根据 pandas 文档,storage_options 参数传递给 urllib 用于 https 链接,仅传递给 fsspec 用于 s3 和 gcs 路径。 see here

原来只是版本问题。存储选项参数作为授权信息的解释在数据流图像中包含的 pandas 版本中不存在,当我通过最新可用 pandas 版本的本地“轮子”时--extra_package 参数,问题自行解决。