如何将适合的变压器保存到 blob 中,以便您的预测管道可以在 AML 服务中使用它?

How to save your fitted transformer into blob, so your prediction pipeline can use it in AML Service?

我正在 Azure 机器学习服务上构建数据转换和训练管道。我想将我的拟合转换器(例如 tf-idf)保存到 blob,以便我的预测管道稍后可以访问它。

transformed_data = PipelineData("transformed_data", 
                               datastore = default_datastore,
                               output_path_on_compute="my_project/tfidf")

step_tfidf = PythonScriptStep(name = "tfidf_step",
                              script_name = "transform.py",
                              arguments = ['--input_data', blob_train_data, 
                                           '--output_folder', transformed_data],
                              inputs = [blob_train_data],
                              outputs = [transformed_data],
                              compute_target = aml_compute,
                              source_directory = project_folder,
                              runconfig = run_config,
                              allow_reuse = False)

以上代码将转换器保存到当前 运行 的文件夹中,该文件夹在每个 运行 期间动态生成。

我想将转换器保存到 blob 上的固定位置,以便稍后在调用预测管道时可以访问它。

我尝试使用 DataReference class 的实例作为 PythonScriptStep 输出,但它导致错误: ValueError: Unexpected output type: <class 'azureml.data.data_reference.DataReference'>

这是因为PythonScriptStep只接受PipelineDataOutputPortBinding对象作为输出。

我怎样才能保存我安装的变压器,以便以后可以通过任何任意过程(例如我的预测管道)访问它?

这可能不够灵活,无法满足您的需求(此外,我还没有对此进行测试),但如果您使用 scikit-learn,一种可能性是将 tf-idf/transformation 步骤包含在scikit-learn Pipeline 对象并将其注册到您的工作区。

因此您的训练脚本将包含:

pipeline = Pipeline([
    ('vectorizer', TfidfVectorizer(stop_words = list(text.ENGLISH_STOP_WORDS))),
    ('classifier', SGDClassifier()
])

pipeline.fit(train[label].values, train[pred_label].values)

# Serialize the pipeline
joblib.dump(value=pipeline, filename='outputs/model.pkl')

并且您的实验提交脚本将包含

run = exp.submit(src)
run.wait_for_completion(show_output = True)
model = run.register_model(model_name='my_pipeline', model_path='outputs/model.pkl')

然后,您可以使用已注册的 "model" 并将其部署为 explained in the documentation 服务,方法是通过

将其加载到评分脚本中
model_path = Model.get_model_path('my_pipeline')
# deserialize the model file back into a sklearn model
model = joblib.load(model_path) 

然而,这会在您的管道中烘焙转换,因此不会像您要求的那样模块化...

另一个解决方案是将 DataReference 作为输入传递给您的 PythonScriptStep

然后在 transform.py 中,您可以将此 DataReference 视为命令行参数。

您可以解析它并像使用任何常规路径一样使用它来保存您的矢量化程序。

例如你可以:

step_tfidf = PythonScriptStep(name = "tfidf_step",
                              script_name = "transform.py",
                              arguments = ['--input_data', blob_train_data, 
                                           '--output_folder', transformed_data,
                                           '--transformer_path', trained_transformer_path],
                              inputs = [blob_train_data, trained_transformer_path],
                              outputs = [transformed_data],
                              compute_target = aml_compute,
                              source_directory = project_folder,
                              runconfig = run_config,
                              allow_reuse = False)

然后在你的脚本中(上面例子中的transform.py)你可以例如:

import argparse
import joblib as jbl
import os

from sklearn.feature_extraction.text import TfidfVectorizer

parser = argparse.ArgumentParser()
parser.add_argument('--transformer_path', dest="transformer_path", required=True)
args = parser.parse_args()

tfidf = ### HERE CREATE AND TRAIN YOUR VECTORIZER ###

vect_filename = os.path.join(args.transformer_path, 'my_vectorizer.jbl')


额外:第三种方法是将矢量化器注册为工作区中的另一个模型。然后您可以像使用任何其他已注册模型一样使用它。 (尽管此选项不涉及显式写入 blob - 如上述问题所述)

另一种选择是使用 DataTransferStep 并使用它将输出复制到 "known location." This notebook 有使用 DataTransferStep 将数据从各种支持的数据存储复制和复制到各种支持的数据存储的示例。

from azureml.data.data_reference import DataReference
from azureml.exceptions import ComputeTargetException
from azureml.core.compute import ComputeTarget, DataFactoryCompute
from azureml.pipeline.steps import DataTransferStep

blob_datastore = Datastore.get(ws, "workspaceblobstore")

blob_data_ref = DataReference(
    datastore=blob_datastore,
    data_reference_name="knownloaction",
    path_on_datastore="knownloaction")

data_factory_name = 'adftest'

def get_or_create_data_factory(workspace, factory_name):
    try:
        return DataFactoryCompute(workspace, factory_name)
    except ComputeTargetException as e:
        if 'ComputeTargetNotFound' in e.message:
            print('Data factory not found, creating...')
            provisioning_config = DataFactoryCompute.provisioning_configuration()
            data_factory = ComputeTarget.create(workspace, factory_name, provisioning_config)
            data_factory.wait_for_completion()
            return data_factory
        else:
            raise e

data_factory_compute = get_or_create_data_factory(ws, data_factory_name)

# Assuming output data is your output from the step that you want to copy

transfer_to_known_location = DataTransferStep(
    name="transfer_to_known_location",
    source_data_reference=[output_data],
    destination_data_reference=blob_data_ref,
    compute_target=data_factory_compute
    )

from azureml.pipeline.core import Pipeline
from azureml.core import Workspace, Experiment

pipeline_01 = Pipeline(
    description="transfer_to_known_location",
    workspace=ws,
    steps=[transfer_to_known_location])

pipeline_run_01 = Experiment(ws, "transfer_to_known_location").submit(pipeline_01)
pipeline_run_01.wait_for_completion()