数据流SDK版本
Dataflow SDK version
我 运行 在通过 运行 来自 Datalab 单元格的代码测试 Dataflow 时遇到了问题。
import apache_beam as beam
# Pipeline options:
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'test002'
gcloud_options.project = 'proj'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location = 'gs://tmp'
# gcloud_options.region = 'europe-west2'
# Worker options:
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 30
worker_options.max_num_workers = 10
# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'
# Pipeline:
PL = beam.Pipeline(options=options)
query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(
PL | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='project', use_standard_sql=False, query=query))
| 'write' >> beam.io.WriteToText('gs://test/test2.txt', num_shards=1)
)
PL.run()
print "Complete"
已经有各种成功的尝试,也有一些失败了。这很好并且可以理解,但我不明白的是我将 SDK 版本从 2.9.0 更改为 2.0.0 所做的工作,如下所示。谁能指出我做了什么以及如何回到 SDK 版本 2.9.0 吗?
你能看看云中的失败工作吗?post你看到了什么?
您可以通过 运行ning 查看您将使用的 SDK 版本:
!pip freeze | grep beam
在你的情况下,这应该 return:
apache-beam==2.0.0
然后通过在顶部添加一个单元格来强制使用所需的版本(即 2.9.0):
!pip install apache-beam[gcp]==2.9.0
如果您已经提交了作业,您可能需要重新启动内核(重置会话)以使更改生效。
使用不同 SDK 的作业之间存在一天的差异,因此我的猜测是您或其他人更改了依赖项(假设那些 运行 来自同一 Datalab 实例和笔记本)。也许没有意识到这一点(即内核重启)。
我 运行 在通过 运行 来自 Datalab 单元格的代码测试 Dataflow 时遇到了问题。
import apache_beam as beam
# Pipeline options:
options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'test002'
gcloud_options.project = 'proj'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location = 'gs://tmp'
# gcloud_options.region = 'europe-west2'
# Worker options:
worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 30
worker_options.max_num_workers = 10
# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'
# Pipeline:
PL = beam.Pipeline(options=options)
query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'
(
PL | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='project', use_standard_sql=False, query=query))
| 'write' >> beam.io.WriteToText('gs://test/test2.txt', num_shards=1)
)
PL.run()
print "Complete"
已经有各种成功的尝试,也有一些失败了。这很好并且可以理解,但我不明白的是我将 SDK 版本从 2.9.0 更改为 2.0.0 所做的工作,如下所示。谁能指出我做了什么以及如何回到 SDK 版本 2.9.0 吗?
你能看看云中的失败工作吗?post你看到了什么?
您可以通过 运行ning 查看您将使用的 SDK 版本:
!pip freeze | grep beam
在你的情况下,这应该 return:
apache-beam==2.0.0
然后通过在顶部添加一个单元格来强制使用所需的版本(即 2.9.0):
!pip install apache-beam[gcp]==2.9.0
如果您已经提交了作业,您可能需要重新启动内核(重置会话)以使更改生效。 使用不同 SDK 的作业之间存在一天的差异,因此我的猜测是您或其他人更改了依赖项(假设那些 运行 来自同一 Datalab 实例和笔记本)。也许没有意识到这一点(即内核重启)。