将 TF2 keras 模型的 signaturedef 映射到 TFX Pipeline 中的 TF Serving classify/predict/regression API 的最佳做法是什么?

What is best practice for mapping a TF2 keras model's signaturedef to TF Serving classify/predict/regression API in TFX Pipeline?

我们正在 Airflow 上构建一个自动化的 TFX 管道,并且我们的模型基于 Keras Tutorial。我们保存keras模型如下:

model.save(fn_args.serving_model_dir, save_format='tf',
           signatures=signatures,
           )

signatures 字典是:

    signatures = {
    'serving_default':
        _get_serve_tf_examples_fn(model, tf_transform_output) \
            .get_concrete_function(
            tf.TensorSpec(
                shape=[None],
                dtype=tf.string,
                name='examples'
            )
        ),

    'prediction':
        get_request_url_fn(model, tf_transform_output) \
            .get_concrete_function(
            tf.TensorSpec(
                shape=[None],
                dtype=tf.string,
                name='prediction_examples'
            )
        ),
}

_get_serve_tf_examples_fn 的目的是为 TFX 评估器组件提供额外的张量,模型中未使用的张量,用于模型评估目的。就像上面的 Keras TFX 教程一样:

def _get_serve_tf_examples_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(_LABEL_KEY)

        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        transformed_features.pop(_transformed_name(_LABEL_KEY))
        return model(transformed_features)

    return serve_tf_examples_fn

上述模型 'interface' 接受 TFX Evaluator 组件 (TFMA) 所需的 TF.Examples。

但是,对于 TF 服务,我们希望能够将 1 个原始字符串 - 只是一个 url - 发送到 TF 服务预测器 REST API 并获得它的预测分数。当前 get_request_url_fn 是:

def get_request_url_fn(model, tf_transform_output):
    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_request_url_fn(request_url):
        feature_spec = tf_transform_output.raw_feature_spec()
        # Model requires just one of the features made available to other TFX components
        # Throw away the rest and leave just 'request_url'
        feature_spec = {'request_url': feature_spec['request_url']}

        parsed_features = tf.io.parse_example(request_url, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        transformed_features.pop(_transformed_name(_LABEL_KEY))
        return model(transformed_features)

    return serve_request_url_fn

虽然这种方法仍然需要以 TF.Example 的形式输入。代表客户需要大量的开销。即,import tensorflow。该代码确实有效:

url = f'http://{server}:8501/v1/models/wrcv3:predict'
headers = {"content-type": "application/json"}
url_request = b'index'
example = tf.train.Example(
            features=tf.train.Features(
              feature={"request_url": 
                          tf.train.Feature(bytes_list=tf.train.BytesList(value=[url_request]))
                      }
                )
            )
print(example)


data = {
  "signature_name":"prediction",
  "instances":[
    {
       "prediction_examples":{"b64": base64.b64encode(example.SerializeToString()).decode('utf-8')}
    }
  ]
}
data = json.dumps(data)
print(data)
json_response = requests.post(url, data=data, headers=headers)
print(json_response.content)
print(json_response.json)

返回结果:

features {
  feature {
    key: "request_url"
    value {
      bytes_list {
        value: "index"
      }
    }
  }
}

{"signature_name": "prediction", "instances": [{"prediction_examples": {"b64": "ChoKGAoLcmVxdWVzdF91cmwSCQoHCgVpbmRleA=="}}]}
b'{\n    "predictions": [[0.897708654]\n    ]\n}'
<bound method Response.json of <Response [200]>>

当我们提交一个 base64 编码的字符串代替 TF.Example 时,它显然失败了:

url = f'http://{server}:8501/v1/models/wrcv3:predict'
headers = {"content-type": "application/json"}
url_request = b'index.html'


data = {
  "signature_name":"prediction",
  "instances":[
    {
       "prediction_examples":{"b64": base64.b64encode(url_request).decode('UTF-8')}
    }
  ]
}
data = json.dumps(data)
print(data)
json_response = requests.post(url, data=data, headers=headers)
print(json_response.content)
print(json_response.json)

返回:

{"signature_name": "prediction", "instances": [{"prediction_examples": {"b64": "aW5kZXguaHRtbA=="}}]}
b'{ "error": "Could not parse example input, value: \\'index.html\\'\n\t [[{{node ParseExample/ParseExampleV2}}]]" }'
<bound method Response.json of <Response [400]>>

问题是:signaturedef/signature 应该是什么样子来接受原始字符串?如果不喜欢 get_request_url_fn 。当然,客户端不应该为了发出请求而加载 TF ?

TFX 网站本身详细记录了 classify/predict/regression here 的 3 个 protobuf,但(对我而言)如何使用这 3 个 protobuf 来完成我们需要的映射并不直观。

在此先表示深深的谢意。

根据您的代码,函数 serve_request_url_fn 的输入是一个密集张量,但您的变换图的输入可能是一个稀疏张量。

函数tf.io.parse_example知道如何将你的tf.Example反序列化为稀疏张量,但如果你想发送一个张量,而不序列化它,那么你应该手动将它转换为稀疏张量并停止使用 tf.io.parse 函数。

例如:

@tf.function(input_signature=[tf.TensorSpec(shape=(None), dtype=tf.string, name='examples')])
def serve_request_url_fn(self, request_url):
    request_url_sp_tensor = tf.sparse.SparseTensor(
        indices=[[0, 0]],
        values=request_url,
        dense_shape=(1, 1)
    )
    parsed_features = {
        'request_url': request_url_sp_tensor,
    }
    transformed_features = self.model.tft_example_layer(parsed_features)
    transformed_features.pop(_transformed_name(_LABEL_KEY))
    return self.model(transformed_features)