从同一个 Cloud Function 执行中启动多个批处理 Dataflow 作业

Start multiple batch Dataflow jobs from the same Cloud Function execution

我创建了一个自定义模板,它使用 ReadFromBigQuery I/O 连接器从 BigQuery 读取数据。我这样使用它:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--query',
            help='Query to retrieve from BigQuery acting as data source.')
        parser.add_argument(
            '--bucket',
            default='mybucketname',
            help='Bucket name for staging, temp and schema files.')

options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'


def run():
    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
                                          query=args.query,
                                          use_standard_sql=True,
                                          flatten_results=False)
            ...
        )

setup.py

import setuptools

REQUIRED_PACKAGES = [
    'apache-beam',
    'apache-beam[gcp]',
    'google-cloud-storage'
]

setuptools.setup(
    name='ProcessEmailMetrics',
    version='0.0.1',
    description='Workflow to process email metrics.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    include_package_data=True
)

最后,这就是我在云函数上调用数据流 API 的方式:

import google.auth
from googleapiclient.discovery import build

credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)

query = """
SELECT ...
"""

BODY = {
    'jobName': 'process-data',
    'gcsPath': 'gs://mybucket/templates/my_template',
    'parameters': {
      'query' : query
    }
}

req = service.projects().locations().templates().create(
    projectId='myproject',
    location='europe-west1',
    body=BODY
)

req.execute()

我通过调用 API 调用在 Cloud Function 上启动模板来侦听 Pub/Sub 主题来开始这项工作。如果我只发布一条关于该主题的消息,则管道完成时不会出现任何错误。但是,如果我从同一个 Cloud Function 执行中启动多个作业,我会收到两个不同的错误。

第一个是关于丢失的文件。前两个错误属于这种类型:

HttpError accessing https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp%2F6b2d2ba6-1%2Fbigquery-table-dump-000000000003.json?alt=media&generation=1628848711723613: response: <{'x-guploader-uploadid': 'ADPycdvNyinmSGSiYZPZw3GAJ4scmNLnGGsv5DUhowTZUYn_L6z9kMZ5b8oFWzPR2utFmTogffLijzmyfcJN_amILlmWQZa7aQ', 'content-type': 'text/html; charset=UTF-8', 'date': 'Fri, 13 Aug 2021 09:58:37 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Fri, 13 Aug 2021 09:58:37 GMT', 'cache-control': 'private, max-age=0', 'content-length': '94', 'server': 'UploadServer', 'status': '404'}>, content <No such object: my-bucket/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json>

第二个是索引错误超出范围,在读取 ReadFromBigQuery 上生成的 avro 文件时再次出现。接下来的三个错误属于这种类型:

2021-08-13 12:03:48.656 CESTError message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start File "/usr/local/lib/python3.7/site-packages/apache_beam/io/concat_source.py", line 84, in read for record in self._source_bundles[source_ix].source.read( IndexError: list index out of range

发生这五个错误后,我的管道失败并停止。

似乎 ReadFromBigQuery 连接器正在寻找一个包含一些 BigQuery 行的临时文件,这些行实际上并不存在,或者已经被搞砸了。

正如我所说,如果我只启动一个 Dataflow 作业,它会在没有任何错误的情况下完成,所以我有两个假设。

  1. 可能跟我的云函数有关。当两条消息发布的时间太接近时,该函数没有时间休眠,并且可能文件路径被弄乱了。

    • 创建 build Dataflow 服务时,cache_discovery=False 选项是否会产生此问题?
  2. 可能是因为我的模板是如何编码的:

    • options.view_as(SetupOptions).save_main_session = True选项可能是问题的关键?
    • 在读取/写入 BigQuery 时,我是否需要以某种方式为每个作业执行提供特定的时间数据集?
    • 每个作业执行在 google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket) 上的不同时间位置?

我需要能够在同一个 Cloud Function 执行上启动多个 Dataflow 作业,因此实际行为不符合我的项目需求。

这是我失败的任务之一:2021-08-13_02_54_10-11165491620802897150.

知道如何解决这个问题吗?

更新:

版本

python: 3.7.3 (on Cloud Shell)
beam: 2.31.0 (on Cloud Shell)
beam: undefined (on setup.py)

我认为问题在于两个管道都在执行 Bigquery 导出到同一个临时目录,并且它们相互干扰。您可以为每个目录提供不同的目录,如下所示:

您能否尝试为 ReadFromBigQuery 转换提供单独的 GCS 位置?你会做这样的事情:

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        ...
        parser.add_value_provider_argument(
            '--export_location',
            help='GCS location to perform Bigquery export')
        ...

并且在您的管道中,您将单独传递此导出位置:

def run():
    with beam.Pipeline(options=options) as p:
        (
            p
            | "Read from BigQuery" >> beam.io.ReadFromBigQuery(
                                          query=args.query,
                                          use_standard_sql=True,
                                          flatten_results=False,
                                          gcs_location=options.export_location)
            ...
        )

最后,每次启动管道时都会自动生成一个新管道:

BODY = {
    'jobName': 'process-data',
    'gcsPath': 'gs://mybucket/templates/my_template',
    'parameters': {
      'query' : query,
      'export_location': 'gs://mybucket/templates/my_template/tmp/' + str(uuid.uuid4())
    }
}

req = service.projects().locations().templates().create(
    projectId='myproject',
    location='europe-west1',
    body=BODY
)

req.execute()