Vertex AI 模型批量预测,引用云存储上现有模型和输入文件的问题
Vertex AI Model Batch prediction, issue with referencing existing model and input file on Cloud Storage
我正在努力正确设置执行以下操作的 Vertex AI 管道:
- 从 API 读取数据并存储到 GCS 并作为批量预测的输入。
- 获取现有模型(Vertex AI 上的视频分类)
- 使用第 1 点的输入创建批量预测作业。
正如将会看到的那样,我对 Vertex Pipelines/Kubeflow 没有太多经验,因此我要求 help/advice,希望这只是一些初学者的错误。
这是我用作管道的代码的要点
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
Output,
Artifact,
Model,
)
PROJECT_ID = 'my-gcp-project'
BUCKET_NAME = "mybucket"
PIPELINE_ROOT = "{}/pipeline_root".format(BUCKET_NAME)
@component
def get_input_data() -> str:
# getting data from API, save to Cloud Storage
# return GS URI
gcs_batch_input_path = 'gs://somebucket/file'
return gcs_batch_input_path
@component(
base_image="python:3.9",
packages_to_install=['google-cloud-aiplatform==1.8.0']
)
def load_ml_model(project_id: str, model: Output[Artifact]):
"""Load existing Vertex model"""
import google.cloud.aiplatform as aip
model_id = '1234'
model = aip.Model(model_name=model_id, project=project_id, location='us-central1')
@dsl.pipeline(
name="batch-pipeline", pipeline_root=PIPELINE_ROOT,
)
def pipeline(gcp_project: str):
input_data = get_input_data()
ml_model = load_ml_model(gcp_project)
gcc_aip.ModelBatchPredictOp(
project=PROJECT_ID,
job_display_name=f'test-prediction',
model=ml_model.output,
gcs_source_uris=[input_data.output], # this doesn't work
# gcs_source_uris=['gs://mybucket/output/'], # hardcoded gs uri works
gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
)
if __name__ == '__main__':
from kfp.v2 import compiler
import google.cloud.aiplatform as aip
pipeline_export_filepath = 'test-pipeline.json'
compiler.Compiler().compile(pipeline_func=pipeline,
package_path=pipeline_export_filepath)
# pipeline_params = {
# 'gcp_project': PROJECT_ID,
# }
# job = aip.PipelineJob(
# display_name='test-pipeline',
# template_path=pipeline_export_filepath,
# pipeline_root=f'gs://{PIPELINE_ROOT}',
# project=PROJECT_ID,
# parameter_values=pipeline_params,
# )
# job.run()
当运行管道抛出这个异常时运行批量预测:
details = "List of found errors: 1.Field: batch_prediction_job.model; Message: Invalid Model resource name.
所以我不确定可能出了什么问题。我尝试在笔记本中加载模型(在组件之外)并且它正确 returns.
我遇到的第二个问题是引用 GCS URI 作为从组件到批处理作业输入的输出。
input_data = get_input_data2()
gcc_aip.ModelBatchPredictOp(
project=PROJECT_ID,
job_display_name=f'test-prediction',
model=ml_model.output,
gcs_source_uris=[input_data.output], # this doesn't work
# gcs_source_uris=['gs://mybucket/output/'], # hardcoded gs uri works
gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
)
在编译期间,我得到以下异常 TypeError: Object of type PipelineParam is not JSON serializable
,但我认为这可能是 ModelBatchPredictOp 组件的问题。
再一次help/advice感谢,我从昨天开始处理这个问题,所以也许我错过了一些明显的东西。
我正在使用的库:
google-cloud-aiplatform==1.8.0
google-cloud-pipeline-components==0.2.0
kfp==1.8.10
kfp-pipeline-spec==0.1.13
kfp-server-api==1.7.1
更新
经过评论、一些研究和调整后,对于参考模型,此方法有效:
@component
def load_ml_model(project_id: str, model: Output[Artifact]):
region = 'us-central1'
model_id = '1234'
model_uid = f'projects/{project_id}/locations/{region}/models/{model_id}'
model.uri = model_uid
model.metadata['resourceName'] = model_uid
然后我就可以按预期使用它了:
batch_predict_op = gcc_aip.ModelBatchPredictOp(
project=gcp_project,
job_display_name=f'batch-prediction-test',
model=ml_model.outputs['model'],
gcs_source_uris=[input_batch_gcs_path],
gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/test'
)
更新 2
关于 GCS 路径,解决方法是在组件外部定义路径并将其作为输入参数传递,例如(缩写):
@dsl.pipeline(
name="my-pipeline",
pipeline_root=PIPELINE_ROOT,
)
def pipeline(
gcp_project: str,
region: str,
bucket: str
):
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
gcs_prediction_input_path = f'gs://{BUCKET_NAME}/prediction_input/video_batch_prediction_input_{ts}.jsonl'
batch_input_data_op = get_input_data(gcs_prediction_input_path) # this loads input data to GCS path
batch_predict_op = gcc_aip.ModelBatchPredictOp(
project=gcp_project,
model=training_job_run_op.outputs["model"],
job_display_name='batch-prediction',
# gcs_source_uris=[batch_input_data_op.output],
gcs_source_uris=[gcs_prediction_input_path],
gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/',
).after(batch_input_data_op) # we need to add 'after' so it runs after input data is prepared since get_input_data doesn't returns anything
仍然不确定,当我 return 来自 get_input_data
组件
的 GCS 路径时,为什么它不 work/compile
很高兴您解决了大部分主要问题并找到了模型声明的解决方法。
对于您对 gcs_source_uris
的 input.output
观察,其背后的原因是因为 function/class return 的值。如果深入研究 google_cloud_pipeline_components
的 class/methods,您会发现它实现了一个结构,允许您从调用的函数的 returned 值中使用 .outputs
。
如果您查看管道组件之一的实现,您会发现它 return 是 convert_method_to_component
函数的输出数组。因此,为了在您的自定义 class/function 中实现它,您的函数应该 return 一个可以作为属性调用的值。下面是它的基本实现。
class CustomClass():
def __init__(self):
self.return_val = {'path':'custompath','desc':'a desc'}
@property
def output(self):
return self.return_val
hello = CustomClass()
print(hello.output['path'])
如果您想深入了解它,可以转到以下页面:
convert_method_to_component,也就是convert_method_to_component
的实现
Properties,python 中 属性 的基础知识。
我正在努力正确设置执行以下操作的 Vertex AI 管道:
- 从 API 读取数据并存储到 GCS 并作为批量预测的输入。
- 获取现有模型(Vertex AI 上的视频分类)
- 使用第 1 点的输入创建批量预测作业。
正如将会看到的那样,我对 Vertex Pipelines/Kubeflow 没有太多经验,因此我要求 help/advice,希望这只是一些初学者的错误。 这是我用作管道的代码的要点
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.v2 import dsl
from kfp.v2.dsl import component
from kfp.v2.dsl import (
Output,
Artifact,
Model,
)
PROJECT_ID = 'my-gcp-project'
BUCKET_NAME = "mybucket"
PIPELINE_ROOT = "{}/pipeline_root".format(BUCKET_NAME)
@component
def get_input_data() -> str:
# getting data from API, save to Cloud Storage
# return GS URI
gcs_batch_input_path = 'gs://somebucket/file'
return gcs_batch_input_path
@component(
base_image="python:3.9",
packages_to_install=['google-cloud-aiplatform==1.8.0']
)
def load_ml_model(project_id: str, model: Output[Artifact]):
"""Load existing Vertex model"""
import google.cloud.aiplatform as aip
model_id = '1234'
model = aip.Model(model_name=model_id, project=project_id, location='us-central1')
@dsl.pipeline(
name="batch-pipeline", pipeline_root=PIPELINE_ROOT,
)
def pipeline(gcp_project: str):
input_data = get_input_data()
ml_model = load_ml_model(gcp_project)
gcc_aip.ModelBatchPredictOp(
project=PROJECT_ID,
job_display_name=f'test-prediction',
model=ml_model.output,
gcs_source_uris=[input_data.output], # this doesn't work
# gcs_source_uris=['gs://mybucket/output/'], # hardcoded gs uri works
gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
)
if __name__ == '__main__':
from kfp.v2 import compiler
import google.cloud.aiplatform as aip
pipeline_export_filepath = 'test-pipeline.json'
compiler.Compiler().compile(pipeline_func=pipeline,
package_path=pipeline_export_filepath)
# pipeline_params = {
# 'gcp_project': PROJECT_ID,
# }
# job = aip.PipelineJob(
# display_name='test-pipeline',
# template_path=pipeline_export_filepath,
# pipeline_root=f'gs://{PIPELINE_ROOT}',
# project=PROJECT_ID,
# parameter_values=pipeline_params,
# )
# job.run()
当运行管道抛出这个异常时运行批量预测:
details = "List of found errors: 1.Field: batch_prediction_job.model; Message: Invalid Model resource name.
所以我不确定可能出了什么问题。我尝试在笔记本中加载模型(在组件之外)并且它正确 returns.
我遇到的第二个问题是引用 GCS URI 作为从组件到批处理作业输入的输出。
input_data = get_input_data2()
gcc_aip.ModelBatchPredictOp(
project=PROJECT_ID,
job_display_name=f'test-prediction',
model=ml_model.output,
gcs_source_uris=[input_data.output], # this doesn't work
# gcs_source_uris=['gs://mybucket/output/'], # hardcoded gs uri works
gcs_destination_output_uri_prefix=f'gs://{PIPELINE_ROOT}/prediction_output/'
)
在编译期间,我得到以下异常 TypeError: Object of type PipelineParam is not JSON serializable
,但我认为这可能是 ModelBatchPredictOp 组件的问题。
再一次help/advice感谢,我从昨天开始处理这个问题,所以也许我错过了一些明显的东西。
我正在使用的库:
google-cloud-aiplatform==1.8.0
google-cloud-pipeline-components==0.2.0
kfp==1.8.10
kfp-pipeline-spec==0.1.13
kfp-server-api==1.7.1
更新 经过评论、一些研究和调整后,对于参考模型,此方法有效:
@component
def load_ml_model(project_id: str, model: Output[Artifact]):
region = 'us-central1'
model_id = '1234'
model_uid = f'projects/{project_id}/locations/{region}/models/{model_id}'
model.uri = model_uid
model.metadata['resourceName'] = model_uid
然后我就可以按预期使用它了:
batch_predict_op = gcc_aip.ModelBatchPredictOp(
project=gcp_project,
job_display_name=f'batch-prediction-test',
model=ml_model.outputs['model'],
gcs_source_uris=[input_batch_gcs_path],
gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/test'
)
更新 2 关于 GCS 路径,解决方法是在组件外部定义路径并将其作为输入参数传递,例如(缩写):
@dsl.pipeline(
name="my-pipeline",
pipeline_root=PIPELINE_ROOT,
)
def pipeline(
gcp_project: str,
region: str,
bucket: str
):
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
gcs_prediction_input_path = f'gs://{BUCKET_NAME}/prediction_input/video_batch_prediction_input_{ts}.jsonl'
batch_input_data_op = get_input_data(gcs_prediction_input_path) # this loads input data to GCS path
batch_predict_op = gcc_aip.ModelBatchPredictOp(
project=gcp_project,
model=training_job_run_op.outputs["model"],
job_display_name='batch-prediction',
# gcs_source_uris=[batch_input_data_op.output],
gcs_source_uris=[gcs_prediction_input_path],
gcs_destination_output_uri_prefix=f'gs://{BUCKET_NAME}/prediction_output/',
).after(batch_input_data_op) # we need to add 'after' so it runs after input data is prepared since get_input_data doesn't returns anything
仍然不确定,当我 return 来自 get_input_data
组件
很高兴您解决了大部分主要问题并找到了模型声明的解决方法。
对于您对 gcs_source_uris
的 input.output
观察,其背后的原因是因为 function/class return 的值。如果深入研究 google_cloud_pipeline_components
的 class/methods,您会发现它实现了一个结构,允许您从调用的函数的 returned 值中使用 .outputs
。
如果您查看管道组件之一的实现,您会发现它 return 是 convert_method_to_component
函数的输出数组。因此,为了在您的自定义 class/function 中实现它,您的函数应该 return 一个可以作为属性调用的值。下面是它的基本实现。
class CustomClass():
def __init__(self):
self.return_val = {'path':'custompath','desc':'a desc'}
@property
def output(self):
return self.return_val
hello = CustomClass()
print(hello.output['path'])
如果您想深入了解它,可以转到以下页面:
convert_method_to_component,也就是
的实现convert_method_to_component
Properties,python 中 属性 的基础知识。