TensorFlow:帮助创建服务输入函数
TensorFlow: Help creating a serving input function
我是 TensorFlow Serving 的新手。我使用估算器训练了一个广泛而深入的模型。现在我想为我的模型服务。我创建我的服务输入接收器函数并保存模型。当我尝试使用保存的模型进行预测时,我总是收到 InternalError: Unable to get element as bytes
。我真的不明白服务函数中应该包含什么或者我应该发送什么格式类型。谁能先给我解释一下服务函数的概念以及如何正确创建函数。
可重现的例子:https://github.com/dangz90/wide_and_deep_debugging/blob/master/wide%20and%20deep%20debug.ipynb
服务功能:
def serving_input_receiver_fn():
serialized_tf_example = tf.placeholder(dtype=tf.string,
shape=[],
name='input_tensor')
receiver_tensors = {'inputs': serialized_tf_example}
parsed_features = tf.parse_single_example(
serialized_tf_example,
# Defaults are not specified since both keys are required.
features={
'var1': tf.FixedLenFeature(shape=[1], dtype=tf.string),
'var2': tf.FixedLenFeature(shape=[1], dtype=tf.string),
'var3': tf.FixedLenFeature(shape=[1], dtype=tf.string),
'var4': tf.VarLenFeature(dtype=tf.string),
})
return tf.estimator.export.ServingInputReceiver(parsed_features, receiver_tensors)
estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
estimator_predictor({ 'inputs': examples.SerializeToString() })
我试过发送 pandas、示例等。但真的不知道数据应该是什么格式。对于我的训练,输入的数据被保存为 tfrecord,然后作为数据集加载。 注意:我如果直接使用模型m.predict()
我能够得到正确的预测
完整错误:
--------------------------------------------------------------------------- InternalError Traceback (most recent call last) /databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args) 1333 try:
-> 1334 return fn(*args) 1335 except errors.OpError as e:
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run_fn(feed_dict, fetch_list, target_list, options, run_metadata) 1318 return self._call_tf_sessionrun(
-> 1319 options, feed_dict, fetch_list, target_list, run_metadata) 1320
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _call_tf_sessionrun(self, options, feed_dict, fetch_list, target_list, run_metadata) 1406 self._session, options, feed_dict, fetch_list, target_list,
-> 1407 run_metadata) 1408
InternalError: Unable to get element as bytes.
During handling of the above exception, another exception occurred:
InternalError Traceback (most recent call last) <command-364073753108128> in <module>()
3
4 estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
----> 5 estimator_predictor({ 'inputs': examples_ })
/databricks/python/lib/python3.6/site-packages/tensorflow/contrib/predictor/predictor.py in __call__(self, input_dict)
75 if value is not None:
76 feed_dict[self.feed_tensors[key]] = value
---> 77 return self._session.run(fetches=self.fetch_tensors, feed_dict=feed_dict)
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
674 feed_dict=feed_dict,
675 options=options,
--> 676 run_metadata=run_metadata)
677
678 def run_step_fn(self, step_fn):
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata) 1169 feed_dict=feed_dict, 1170 options=options,
-> 1171 run_metadata=run_metadata) 1172 except _PREEMPTION_ERRORS as e: 1173 logging.info('An error was raised. This may be due to a preemption in '
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs) 1268 raise six.reraise(*original_exc_info) 1269 else:
-> 1270 raise six.reraise(*original_exc_info) 1271 1272
/databricks/python/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs) 1253 def run(self, *args,
**kwargs): 1254 try:
-> 1255 return self._sess.run(*args, **kwargs) 1256 except _PREEMPTION_ERRORS: 1257 raise
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata) 1325 feed_dict=feed_dict, 1326 options=options,
-> 1327 run_metadata=run_metadata) 1328 1329 for hook in self._hooks:
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs) 1089 1090 def run(self, *args,
**kwargs):
-> 1091 return self._sess.run(*args, **kwargs) 1092 1093 def run_step_fn(self, step_fn, raw_session, run_with_hooks):
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
927 try:
928 result = self._run(None, fetches, feed_dict, options_ptr,
--> 929 run_metadata_ptr)
930 if run_metadata:
931 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata) 1150 if final_fetches or final_targets or (handle and feed_dict_tensor): 1151 results = self._do_run(handle, final_targets, final_fetches,
-> 1152 feed_dict_tensor, options, run_metadata) 1153 else: 1154 results = []
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata) 1326 if handle is None: 1327 return self._do_call(_run_fn, feeds, fetches, targets, options,
-> 1328 run_metadata) 1329 else: 1330 return self._do_call(_prun_fn, handle, feeds, fetches)
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args) 1346 pass 1347 message = error_interpolation.interpolate(message, self._graph)
-> 1348 raise type(e)(node_def, op, message) 1349 1350 def _extend_graph(self):
InternalError: Unable to get element as bytes.
以下是示例在序列化之前的样子。
features {
feature {
key: "var4"
value {
bytes_list {
value: "43"
value: "65"
value: "89"
value: "02"
}
}
}
feature {
key: "var3"
value {
bytes_list {
value: "0123194"
}
}
}
feature {
key: "var2"
value {
bytes_list {
value: "1243"
}
}
}
feature {
key: "var1"
value {
bytes_list {
value: "54"
}
}
}
}
要获取示例,我 运行 以下脚本:
def serialise_input(data):
dict_feature = {}
for e in data.items():
if e[0] == "var4":
dict_feature.update({e[0]: Feature(bytes_list=BytesList(value=[m.encode('utf-8') for m in e[1]]))})
else:
dict_feature.update({e[0]: Feature(bytes_list=BytesList( value=[e[1].encode()] ))})
example = Example(features=Features(feature=dict_feature))
return example.SerializeToString()
# Serialize Input
raw_data = test.toPandas().dropna().iloc[0,:-1]
examples_ = serialise_input(raw_data)
提前致谢。
您的 serialized_tf_example
占位符似乎有 shape=[]
,这是一个示例。您应该传递单个 tf.train.Example,序列化为字符串:
# I assume here examples_ is a list of tf.train.Examples
example = examples_[0].SerializeToString()
estimator_predictor({ 'inputs': example })
如果您想提供一批示例而不是单个序列化示例,则需要使用 shape=[None]
和 tf.io.parse_example
。
此外,您提供的示例应该定义了您在特征字典中引用的特征(例如 var1
),以便它们可以被正确解析。
服务函数指定您的模型在预测时应如何接收其输入 - 当您将经过训练的 TensorFlow 图/模型导出为 SavedModel 时。而不是像在培训中那样使用占位符或 tf.data.Dataset 并需要有状态 python / TF 运行 时间执行的活动 TF 会话,您希望您的 SavedModel 能够被序列化和写入到磁盘,以便您可以使用 TF Serving 或 运行 将其部署到移动设备等
所以你导出这个 SavedModel - 服务输入是里面定义如何解析它发送的请求然后将其连接到你的模型的部分你可能想要发送它们的方式是 tf.train.Examples协议缓冲区(序列化为字符串,以便可以在 RPC 中发送)。然后你将它们解析成一个特征字典,这样你的估算器就可以像理解你的训练数据一样理解它们。
问题出在我构建输入和输入服务函数的方式上。这是解决方案:
feat_name_type = {'var1':str, 'var2':str, 'var3':str, 'var4':list}
def input_fn(df):
examples = [None] * len(df)
for i, sample in df.iterrows():
ex = tf.train.Example()
for feat_name, feat_type in feat_name_type.items():
feat = ex.features.feature[feat_name]
if feat_type == int:
feat.int64_list.value.extend([sample[feat_name]])
elif feat_type == float:
feat.float_list.value.extend([sample[feat_name]])
elif feat_type == str:
feat.bytes_list.value.extend([sample[feat_name].encode()])
elif feat_type == list:
feat.bytes_list.value.extend([s.encode() for s in sample[feat_name]])
examples[i] = ex.SerializeToString()
return {"inputs": examples}
'''Service input function'''
tf_feat_cols = deep_columns + wide_columns #The feature columns created for feeding into the model
serve_rcvr_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
tf.feature_column.make_parse_example_spec(tf_feat_cols)
)
rcvr_fn_map = {
tf.estimator.ModeKeys.PREDICT: serve_rcvr_fn,
}
我是 TensorFlow Serving 的新手。我使用估算器训练了一个广泛而深入的模型。现在我想为我的模型服务。我创建我的服务输入接收器函数并保存模型。当我尝试使用保存的模型进行预测时,我总是收到 InternalError: Unable to get element as bytes
。我真的不明白服务函数中应该包含什么或者我应该发送什么格式类型。谁能先给我解释一下服务函数的概念以及如何正确创建函数。
可重现的例子:https://github.com/dangz90/wide_and_deep_debugging/blob/master/wide%20and%20deep%20debug.ipynb
服务功能:
def serving_input_receiver_fn():
serialized_tf_example = tf.placeholder(dtype=tf.string,
shape=[],
name='input_tensor')
receiver_tensors = {'inputs': serialized_tf_example}
parsed_features = tf.parse_single_example(
serialized_tf_example,
# Defaults are not specified since both keys are required.
features={
'var1': tf.FixedLenFeature(shape=[1], dtype=tf.string),
'var2': tf.FixedLenFeature(shape=[1], dtype=tf.string),
'var3': tf.FixedLenFeature(shape=[1], dtype=tf.string),
'var4': tf.VarLenFeature(dtype=tf.string),
})
return tf.estimator.export.ServingInputReceiver(parsed_features, receiver_tensors)
estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
estimator_predictor({ 'inputs': examples.SerializeToString() })
我试过发送 pandas、示例等。但真的不知道数据应该是什么格式。对于我的训练,输入的数据被保存为 tfrecord,然后作为数据集加载。 注意:我如果直接使用模型m.predict()
我能够得到正确的预测
完整错误:
--------------------------------------------------------------------------- InternalError Traceback (most recent call last) /databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args) 1333 try:
-> 1334 return fn(*args) 1335 except errors.OpError as e:
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run_fn(feed_dict, fetch_list, target_list, options, run_metadata) 1318 return self._call_tf_sessionrun(
-> 1319 options, feed_dict, fetch_list, target_list, run_metadata) 1320
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _call_tf_sessionrun(self, options, feed_dict, fetch_list, target_list, run_metadata) 1406 self._session, options, feed_dict, fetch_list, target_list,
-> 1407 run_metadata) 1408
InternalError: Unable to get element as bytes.
During handling of the above exception, another exception occurred:
InternalError Traceback (most recent call last) <command-364073753108128> in <module>()
3
4 estimator_predictor = tf.contrib.predictor.from_estimator(m, serving_input_receiver_fn)
----> 5 estimator_predictor({ 'inputs': examples_ })
/databricks/python/lib/python3.6/site-packages/tensorflow/contrib/predictor/predictor.py in __call__(self, input_dict)
75 if value is not None:
76 feed_dict[self.feed_tensors[key]] = value
---> 77 return self._session.run(fetches=self.fetch_tensors, feed_dict=feed_dict)
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata)
674 feed_dict=feed_dict,
675 options=options,
--> 676 run_metadata=run_metadata)
677
678 def run_step_fn(self, step_fn):
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata) 1169 feed_dict=feed_dict, 1170 options=options,
-> 1171 run_metadata=run_metadata) 1172 except _PREEMPTION_ERRORS as e: 1173 logging.info('An error was raised. This may be due to a preemption in '
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs) 1268 raise six.reraise(*original_exc_info) 1269 else:
-> 1270 raise six.reraise(*original_exc_info) 1271 1272
/databricks/python/lib/python3.6/site-packages/six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs) 1253 def run(self, *args,
**kwargs): 1254 try:
-> 1255 return self._sess.run(*args, **kwargs) 1256 except _PREEMPTION_ERRORS: 1257 raise
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, fetches, feed_dict, options, run_metadata) 1325 feed_dict=feed_dict, 1326 options=options,
-> 1327 run_metadata=run_metadata) 1328 1329 for hook in self._hooks:
/databricks/python/lib/python3.6/site-packages/tensorflow/python/training/monitored_session.py in run(self, *args, **kwargs) 1089 1090 def run(self, *args,
**kwargs):
-> 1091 return self._sess.run(*args, **kwargs) 1092 1093 def run_step_fn(self, step_fn, raw_session, run_with_hooks):
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in run(self, fetches, feed_dict, options, run_metadata)
927 try:
928 result = self._run(None, fetches, feed_dict, options_ptr,
--> 929 run_metadata_ptr)
930 if run_metadata:
931 proto_data = tf_session.TF_GetBuffer(run_metadata_ptr)
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _run(self, handle, fetches, feed_dict, options, run_metadata) 1150 if final_fetches or final_targets or (handle and feed_dict_tensor): 1151 results = self._do_run(handle, final_targets, final_fetches,
-> 1152 feed_dict_tensor, options, run_metadata) 1153 else: 1154 results = []
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_run(self, handle, target_list, fetch_list, feed_dict, options, run_metadata) 1326 if handle is None: 1327 return self._do_call(_run_fn, feeds, fetches, targets, options,
-> 1328 run_metadata) 1329 else: 1330 return self._do_call(_prun_fn, handle, feeds, fetches)
/databricks/python/lib/python3.6/site-packages/tensorflow/python/client/session.py in _do_call(self, fn, *args) 1346 pass 1347 message = error_interpolation.interpolate(message, self._graph)
-> 1348 raise type(e)(node_def, op, message) 1349 1350 def _extend_graph(self):
InternalError: Unable to get element as bytes.
以下是示例在序列化之前的样子。
features {
feature {
key: "var4"
value {
bytes_list {
value: "43"
value: "65"
value: "89"
value: "02"
}
}
}
feature {
key: "var3"
value {
bytes_list {
value: "0123194"
}
}
}
feature {
key: "var2"
value {
bytes_list {
value: "1243"
}
}
}
feature {
key: "var1"
value {
bytes_list {
value: "54"
}
}
}
}
要获取示例,我 运行 以下脚本:
def serialise_input(data):
dict_feature = {}
for e in data.items():
if e[0] == "var4":
dict_feature.update({e[0]: Feature(bytes_list=BytesList(value=[m.encode('utf-8') for m in e[1]]))})
else:
dict_feature.update({e[0]: Feature(bytes_list=BytesList( value=[e[1].encode()] ))})
example = Example(features=Features(feature=dict_feature))
return example.SerializeToString()
# Serialize Input
raw_data = test.toPandas().dropna().iloc[0,:-1]
examples_ = serialise_input(raw_data)
提前致谢。
您的 serialized_tf_example
占位符似乎有 shape=[]
,这是一个示例。您应该传递单个 tf.train.Example,序列化为字符串:
# I assume here examples_ is a list of tf.train.Examples
example = examples_[0].SerializeToString()
estimator_predictor({ 'inputs': example })
如果您想提供一批示例而不是单个序列化示例,则需要使用 shape=[None]
和 tf.io.parse_example
。
此外,您提供的示例应该定义了您在特征字典中引用的特征(例如 var1
),以便它们可以被正确解析。
服务函数指定您的模型在预测时应如何接收其输入 - 当您将经过训练的 TensorFlow 图/模型导出为 SavedModel 时。而不是像在培训中那样使用占位符或 tf.data.Dataset 并需要有状态 python / TF 运行 时间执行的活动 TF 会话,您希望您的 SavedModel 能够被序列化和写入到磁盘,以便您可以使用 TF Serving 或 运行 将其部署到移动设备等
所以你导出这个 SavedModel - 服务输入是里面定义如何解析它发送的请求然后将其连接到你的模型的部分你可能想要发送它们的方式是 tf.train.Examples协议缓冲区(序列化为字符串,以便可以在 RPC 中发送)。然后你将它们解析成一个特征字典,这样你的估算器就可以像理解你的训练数据一样理解它们。
问题出在我构建输入和输入服务函数的方式上。这是解决方案:
feat_name_type = {'var1':str, 'var2':str, 'var3':str, 'var4':list}
def input_fn(df):
examples = [None] * len(df)
for i, sample in df.iterrows():
ex = tf.train.Example()
for feat_name, feat_type in feat_name_type.items():
feat = ex.features.feature[feat_name]
if feat_type == int:
feat.int64_list.value.extend([sample[feat_name]])
elif feat_type == float:
feat.float_list.value.extend([sample[feat_name]])
elif feat_type == str:
feat.bytes_list.value.extend([sample[feat_name].encode()])
elif feat_type == list:
feat.bytes_list.value.extend([s.encode() for s in sample[feat_name]])
examples[i] = ex.SerializeToString()
return {"inputs": examples}
'''Service input function'''
tf_feat_cols = deep_columns + wide_columns #The feature columns created for feeding into the model
serve_rcvr_fn = tf.estimator.export.build_parsing_serving_input_receiver_fn(
tf.feature_column.make_parse_example_spec(tf_feat_cols)
)
rcvr_fn_map = {
tf.estimator.ModeKeys.PREDICT: serve_rcvr_fn,
}