从同一个 Cloud Function 执行中启动多个批处理 Dataflow 作业
Start multiple batch Dataflow jobs from the same Cloud Function execution
我创建了一个自定义模板,它使用 ReadFromBigQuery
I/O 连接器从 BigQuery 读取数据。我这样使用它:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
help='Query to retrieve from BigQuery acting as data source.')
parser.add_argument(
'--bucket',
default='mybucketname',
help='Bucket name for staging, temp and schema files.')
options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False)
...
)
setup.py
import setuptools
REQUIRED_PACKAGES = [
'apache-beam',
'apache-beam[gcp]',
'google-cloud-storage'
]
setuptools.setup(
name='ProcessEmailMetrics',
version='0.0.1',
description='Workflow to process email metrics.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
include_package_data=True
)
最后,这就是我在云函数上调用数据流 API 的方式:
import google.auth
from googleapiclient.discovery import build
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
query = """
SELECT ...
"""
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()
我通过调用 API 调用在 Cloud Function 上启动模板来侦听 Pub/Sub 主题来开始这项工作。如果我只发布一条关于该主题的消息,则管道完成时不会出现任何错误。但是,如果我从同一个 Cloud Function 执行中启动多个作业,我会收到两个不同的错误。
第一个是关于丢失的文件。前两个错误属于这种类型:
HttpError accessing https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp%2F6b2d2ba6-1%2Fbigquery-table-dump-000000000003.json?alt=media&generation=1628848711723613:
response: <{'x-guploader-uploadid': 'ADPycdvNyinmSGSiYZPZw3GAJ4scmNLnGGsv5DUhowTZUYn_L6z9kMZ5b8oFWzPR2utFmTogffLijzmyfcJN_amILlmWQZa7aQ', 'content-type': 'text/html; charset=UTF-8', 'date': 'Fri, 13 Aug 2021 09:58:37 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Fri, 13 Aug 2021 09:58:37 GMT', 'cache-control': 'private, max-age=0', 'content-length': '94', 'server': 'UploadServer', 'status': '404'}>, content <No such object: my-bucket/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json>
第二个是索引错误超出范围,在读取 ReadFromBigQuery
上生成的 avro 文件时再次出现。接下来的三个错误属于这种类型:
2021-08-13 12:03:48.656 CESTError message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start File "/usr/local/lib/python3.7/site-packages/apache_beam/io/concat_source.py", line 84, in read for record in self._source_bundles[source_ix].source.read( IndexError: list index out of range
发生这五个错误后,我的管道失败并停止。
似乎 ReadFromBigQuery
连接器正在寻找一个包含一些 BigQuery 行的临时文件,这些行实际上并不存在,或者已经被搞砸了。
正如我所说,如果我只启动一个 Dataflow 作业,它会在没有任何错误的情况下完成,所以我有两个假设。
可能跟我的云函数有关。当两条消息发布的时间太接近时,该函数没有时间休眠,并且可能文件路径被弄乱了。
- 创建
build
Dataflow 服务时,cache_discovery=False
选项是否会产生此问题?
可能是因为我的模板是如何编码的:
options.view_as(SetupOptions).save_main_session = True
选项可能是问题的关键?
- 在读取/写入 BigQuery 时,我是否需要以某种方式为每个作业执行提供特定的时间数据集?
- 每个作业执行在
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
上的不同时间位置?
我需要能够在同一个 Cloud Function 执行上启动多个 Dataflow 作业,因此实际行为不符合我的项目需求。
这是我失败的任务之一:2021-08-13_02_54_10-11165491620802897150
.
知道如何解决这个问题吗?
更新:
版本
python: 3.7.3 (on Cloud Shell)
beam: 2.31.0 (on Cloud Shell)
beam: undefined (on setup.py)
我认为问题在于两个管道都在执行 Bigquery 导出到同一个临时目录,并且它们相互干扰。您可以为每个目录提供不同的目录,如下所示:
您能否尝试为 ReadFromBigQuery 转换提供单独的 GCS 位置?你会做这样的事情:
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
...
parser.add_value_provider_argument(
'--export_location',
help='GCS location to perform Bigquery export')
...
并且在您的管道中,您将单独传递此导出位置:
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False,
gcs_location=options.export_location)
...
)
最后,每次启动管道时都会自动生成一个新管道:
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query,
'export_location': 'gs://mybucket/templates/my_template/tmp/' + str(uuid.uuid4())
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()
我创建了一个自定义模板,它使用 ReadFromBigQuery
I/O 连接器从 BigQuery 读取数据。我这样使用它:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--query',
help='Query to retrieve from BigQuery acting as data source.')
parser.add_argument(
'--bucket',
default='mybucketname',
help='Bucket name for staging, temp and schema files.')
options = PipelineOptions()
args = options.view_as(CustomOptions)
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'myproject'
google_cloud_options.region = 'europe-west1'
google_cloud_options.staging_location = 'gs://{}/staging/'.format(args.bucket)
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = './setup.py'
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False)
...
)
setup.py
import setuptools
REQUIRED_PACKAGES = [
'apache-beam',
'apache-beam[gcp]',
'google-cloud-storage'
]
setuptools.setup(
name='ProcessEmailMetrics',
version='0.0.1',
description='Workflow to process email metrics.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages(),
include_package_data=True
)
最后,这就是我在云函数上调用数据流 API 的方式:
import google.auth
from googleapiclient.discovery import build
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials, cache_discovery=False)
query = """
SELECT ...
"""
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()
我通过调用 API 调用在 Cloud Function 上启动模板来侦听 Pub/Sub 主题来开始这项工作。如果我只发布一条关于该主题的消息,则管道完成时不会出现任何错误。但是,如果我从同一个 Cloud Function 执行中启动多个作业,我会收到两个不同的错误。
第一个是关于丢失的文件。前两个错误属于这种类型:
HttpError accessing https://www.googleapis.com/storage/v1/b/my-bucket/o/tmp%2F6b2d2ba6-1%2Fbigquery-table-dump-000000000003.json?alt=media&generation=1628848711723613: response: <{'x-guploader-uploadid': 'ADPycdvNyinmSGSiYZPZw3GAJ4scmNLnGGsv5DUhowTZUYn_L6z9kMZ5b8oFWzPR2utFmTogffLijzmyfcJN_amILlmWQZa7aQ', 'content-type': 'text/html; charset=UTF-8', 'date': 'Fri, 13 Aug 2021 09:58:37 GMT', 'vary': 'Origin, X-Origin', 'expires': 'Fri, 13 Aug 2021 09:58:37 GMT', 'cache-control': 'private, max-age=0', 'content-length': '94', 'server': 'UploadServer', 'status': '404'}>, content <No such object: my-bucket/tmp/6b2d2ba6-1/bigquery-table-dump-000000000003.json>
第二个是索引错误超出范围,在读取 ReadFromBigQuery
上生成的 avro 文件时再次出现。接下来的三个错误属于这种类型:
2021-08-13 12:03:48.656 CESTError message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 651, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start File "dataflow_worker/native_operations.py", line 48, in dataflow_worker.native_operations.NativeReadOperation.start File "/usr/local/lib/python3.7/site-packages/apache_beam/io/concat_source.py", line 84, in read for record in self._source_bundles[source_ix].source.read( IndexError: list index out of range
发生这五个错误后,我的管道失败并停止。
似乎 ReadFromBigQuery
连接器正在寻找一个包含一些 BigQuery 行的临时文件,这些行实际上并不存在,或者已经被搞砸了。
正如我所说,如果我只启动一个 Dataflow 作业,它会在没有任何错误的情况下完成,所以我有两个假设。
可能跟我的云函数有关。当两条消息发布的时间太接近时,该函数没有时间休眠,并且可能文件路径被弄乱了。
- 创建
build
Dataflow 服务时,cache_discovery=False
选项是否会产生此问题?
- 创建
可能是因为我的模板是如何编码的:
options.view_as(SetupOptions).save_main_session = True
选项可能是问题的关键?- 在读取/写入 BigQuery 时,我是否需要以某种方式为每个作业执行提供特定的时间数据集?
- 每个作业执行在
google_cloud_options.temp_location = 'gs://{}/tmp/'.format(args.bucket)
上的不同时间位置?
我需要能够在同一个 Cloud Function 执行上启动多个 Dataflow 作业,因此实际行为不符合我的项目需求。
这是我失败的任务之一:2021-08-13_02_54_10-11165491620802897150
.
知道如何解决这个问题吗?
更新:
版本
python: 3.7.3 (on Cloud Shell)
beam: 2.31.0 (on Cloud Shell)
beam: undefined (on setup.py)
我认为问题在于两个管道都在执行 Bigquery 导出到同一个临时目录,并且它们相互干扰。您可以为每个目录提供不同的目录,如下所示:
您能否尝试为 ReadFromBigQuery 转换提供单独的 GCS 位置?你会做这样的事情:
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
...
parser.add_value_provider_argument(
'--export_location',
help='GCS location to perform Bigquery export')
...
并且在您的管道中,您将单独传递此导出位置:
def run():
with beam.Pipeline(options=options) as p:
(
p
| "Read from BigQuery" >> beam.io.ReadFromBigQuery(
query=args.query,
use_standard_sql=True,
flatten_results=False,
gcs_location=options.export_location)
...
)
最后,每次启动管道时都会自动生成一个新管道:
BODY = {
'jobName': 'process-data',
'gcsPath': 'gs://mybucket/templates/my_template',
'parameters': {
'query' : query,
'export_location': 'gs://mybucket/templates/my_template/tmp/' + str(uuid.uuid4())
}
}
req = service.projects().locations().templates().create(
projectId='myproject',
location='europe-west1',
body=BODY
)
req.execute()