如何使用 PipelineOptions 告诉 Dataflow "use_unsupported_python_version"?
How can I tell Dataflow to "use_unsupported_python_version" with PipelineOptions?
我正在尝试使用 Google 数据流将数据从一个 BigQuery table 传输到另一个:
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
def parseArgs():
parser = argparse.ArgumentParser()
parser.add_argument(
'--experiment',
default='use_unsupported_python_version',
help='This does not seem to do anything.')
args, beam_args = parser.parse_known_args()
return beam_args
def beamer(rows=[]):
if len(rows) == 0:
return
project = 'myproject-474601'
gcs_temp_location = 'gs://my_temp_bucket/tmp'
gcs_staging_location = 'gs://my_temp_bucket/staging'
table_spec = bigquery.TableReference(
projectId=project,
datasetId='mydataset',
tableId='test')
beam_options = PipelineOptions(
parseArgs(), # This doesn't seem to work.
project=project,
runner='DataflowRunner',
job_name='unique-job-name',
temp_location=gcs_temp_location,
staging_location=gcs_staging_location,
use_unsupported_python_version=True, # This doesn't work either. :(
experiment='use_unsupported_python_version' # This also doesn't work.
)
with beam.Pipeline(options=beam_options) as p:
quotes = p | beam.Create(rows)
quotes | beam.io.WriteToBigQuery(
table_spec,
# custom_gcs_temp_location = gcs_temp_location, # Not needed?
method='FILE_LOADS',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
return
if __name__ == '__main__':
beamer(rows=[{'id': 'ein', 'value': None, 'year': None, 'valueHistory': [{'year': 2021, 'amount': 900}]}])
但显然 Dataflow 不支持我的 Python 版本,因为我收到此错误:
Exception: Dataflow runner currently supports Python versions ['3.6', '3.7', '3.8'], got 3.9.7 (default, Sep 16 2021, 08:50:36)
[Clang 10.0.0 ].
To ignore this requirement and start a job using an unsupported version of Python interpreter, pass --experiment use_unsupported_python_version pipeline option.
所以我尝试将 use_unsupported_python_version
参数添加到 PipelineOptions 无济于事。我还尝试了 experiment
选项。在 the official pipeline option docs 中,它显示 args 已成功合并到 PipelineOptions 中,所以我也尝试了。
但我仍然遇到相同的 unsupported version
错误。如何让 Dataflow 使用我的 Python 版本?
尝试通过 experiments=['use_unsupported_python_version']
。您也可以删除 parseArgs
的实现。
'--实验=use_unsupported_python_version'
请像上面一样添加选项
我正在尝试使用 Google 数据流将数据从一个 BigQuery table 传输到另一个:
import apache_beam as beam
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import PipelineOptions
import argparse
def parseArgs():
parser = argparse.ArgumentParser()
parser.add_argument(
'--experiment',
default='use_unsupported_python_version',
help='This does not seem to do anything.')
args, beam_args = parser.parse_known_args()
return beam_args
def beamer(rows=[]):
if len(rows) == 0:
return
project = 'myproject-474601'
gcs_temp_location = 'gs://my_temp_bucket/tmp'
gcs_staging_location = 'gs://my_temp_bucket/staging'
table_spec = bigquery.TableReference(
projectId=project,
datasetId='mydataset',
tableId='test')
beam_options = PipelineOptions(
parseArgs(), # This doesn't seem to work.
project=project,
runner='DataflowRunner',
job_name='unique-job-name',
temp_location=gcs_temp_location,
staging_location=gcs_staging_location,
use_unsupported_python_version=True, # This doesn't work either. :(
experiment='use_unsupported_python_version' # This also doesn't work.
)
with beam.Pipeline(options=beam_options) as p:
quotes = p | beam.Create(rows)
quotes | beam.io.WriteToBigQuery(
table_spec,
# custom_gcs_temp_location = gcs_temp_location, # Not needed?
method='FILE_LOADS',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
return
if __name__ == '__main__':
beamer(rows=[{'id': 'ein', 'value': None, 'year': None, 'valueHistory': [{'year': 2021, 'amount': 900}]}])
但显然 Dataflow 不支持我的 Python 版本,因为我收到此错误:
Exception: Dataflow runner currently supports Python versions ['3.6', '3.7', '3.8'], got 3.9.7 (default, Sep 16 2021, 08:50:36)
[Clang 10.0.0 ].
To ignore this requirement and start a job using an unsupported version of Python interpreter, pass --experiment use_unsupported_python_version pipeline option.
所以我尝试将 use_unsupported_python_version
参数添加到 PipelineOptions 无济于事。我还尝试了 experiment
选项。在 the official pipeline option docs 中,它显示 args 已成功合并到 PipelineOptions 中,所以我也尝试了。
但我仍然遇到相同的 unsupported version
错误。如何让 Dataflow 使用我的 Python 版本?
尝试通过 experiments=['use_unsupported_python_version']
。您也可以删除 parseArgs
的实现。
'--实验=use_unsupported_python_version' 请像上面一样添加选项