数据流SDK版本

Dataflow SDK version

我 运行 在通过 运行 来自 Datalab 单元格的代码测试 Dataflow 时遇到了问题。

import apache_beam as beam

# Pipeline options:
options                         = beam.options.pipeline_options.PipelineOptions()
gcloud_options                  = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name         = 'test002'
gcloud_options.project          = 'proj'
gcloud_options.staging_location = 'gs://staging'
gcloud_options.temp_location    = 'gs://tmp'
# gcloud_options.region           = 'europe-west2'

# Worker options:
worker_options                  = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb     = 30
worker_options.max_num_workers  = 10

# Standard options:
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'
# options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DirectRunner'

# Pipeline:

PL = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:samples.shakespeare] LIMIT 10'    
(
    PL | 'read'  >> beam.io.Read(beam.io.BigQuerySource(project='project', use_standard_sql=False, query=query))
       | 'write' >> beam.io.WriteToText('gs://test/test2.txt', num_shards=1)
)

PL.run()

print "Complete"

已经有各种成功的尝试,也有一些失败了。这很好并且可以理解,但我不明白的是我将 SDK 版本从 2.9.0 更改为 2.0.0 所做的工作,如下所示。谁能指出我做了什么以及如何回到 SDK 版本 2.9.0 吗?

你能看看云中的失败工作吗?post你看到了什么?

您可以通过 运行ning 查看您将使用的 SDK 版本:

!pip freeze | grep beam

在你的情况下,这应该 return:

apache-beam==2.0.0

然后通过在顶部添加一个单元格来强制使用所需的版本(即 2.9.0):

!pip install apache-beam[gcp]==2.9.0

如果您已经提交了作业,您可能需要重新启动内核(重置会话)以使更改生效。 使用不同 SDK 的作业之间存在一天的差异,因此我的猜测是您或其他人更改了依赖项(假设那些 运行 来自同一 Datalab 实例和笔记本)。也许没有意识到这一点(即内核重启)。