在 Vertex AI 中为单个 TFX 流水线组件指定机器类型

Specify machine type for a single TFX pipeline component in Vertex AI

我正在使用 TFX 在 Vertex AI 上构建 AI 管道。我已经按照 this tutorial 开始,然后我将管道调整为我自己的数据,该数据具有超过 1 亿行时间序列数据。由于内存问题,我的几个组件在中途被杀死,所以我只想为这些组件设置内存要求。我使用 KubeflowV2DagRunner 通过以下代码在 Vertex AI 中编排和启动管道:

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(
        default_image = 'gcr.io/watch-hop/hop-tfx-covid:0.6.2'
    ),
    output_filename=PIPELINE_DEFINITION_FILE)

_ = runner.run(
    create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_path=DATA_ROOT, metadata_path=METADATA_PATH))

A similar question has been answered on Stack Overflow, which has led me to a way to set memory requirements in AI Platform,但这些配置在 KubeflowV2DagRunnerConfig 中不再存在,所以我走投无路了。

如有任何帮助,我们将不胜感激。

** 编辑 **
我们将组件定义为带有 @component 装饰器的 python 函数,因此它们中的大多数都是自定义组件。对于训练组件,我知道您可以使用 tfx.Trainer class 指定机器类型,如 this tutorial 中所述,尽管我的问题是针对不进行任何训练的自定义组件。

原来你现在不能,但根据这个issue,这个功能即将到来。

另一种解决方案是将您的 TFX 管道转换为 Kubeflow 管道。 Vertex AI 管道支持 kubeflow,您可以使用这些管道在组件级别设置内存和 cpu 约束。

@component // imported from kfp.dsl
def MyComponent(Input[Dataset] input_data):
  // ...

@pipeline // imported from kfp.dsl
def MyPipeline(...):
  component = MyComponent(...)
  component.set_memory_limit('64G') // alternative to set_memory_request(...)

此解决方案的另一种选择是使用数据流梁 运行ner,它允许组件通过 Vertex 成为 运行 数据流集群。我仍在寻找一种方法来为自定义组件指定机器类型

示例光束输入:

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
--project=  GOOGLE_CLOUD_PROJECT,
--temp_location= GCS_LOCAITON,
--runner=DataflowRunner

]

现在您将迁移到 Vertex AI