如何从 TFX BulkInferrer 获取数据框或数据库写入?
How do I get a dataframe or database write from TFX BulkInferrer?
我是 TFX 的新手,但有一个可以通过 BulkInferrer 使用的明显有效的 ML 管道。这似乎只以 Protobuf 格式生成输出,但由于我是 运行 批量推理,所以我想将结果通过管道传输到数据库。 (数据库输出似乎应该是批量推理的默认输出,因为批量推理和数据库访问都利用了并行化……但 Protobuf 是按记录的序列化格式。)
我想我可以使用类似 Parquet-Avro-Protobuf to do the conversion (though that's in Java and the rest of the pipeline's in Python), or I could write something myself to consume all the protobuf messages one-by-one, convert them into JSON, deserialize the JSON into a list of dicts, and load the dict into a Pandas DataFrame, or store it as a bunch of key-value pairs which I treat like a single-use DB... but that sounds like a lot of work and pain involving parallelization and optimization for a very common use case. The top-level Protobuf message definition is Tensorflow's PredictionLog 的东西。
这 必须 是一个常见的用例,因为 TensorFlowModelAnalytics 的功能类似于 this one 消耗 Pandas 个数据帧。我宁愿能够直接写入数据库(最好是 Google BigQuery)或 Parquet 文件(因为 Parquet / Spark 似乎比 Pandas 更好地并行化),而且,那些看起来像他们应该是常见的用例,但我还没有找到任何例子。也许我使用了错误的搜索词?
我还看了 PredictExtractor, since "extracting predictions" sounds close to what I want... but the official documentation appears silent on how that class is supposed to be used. I thought TFTransformOutput 听起来像一个很有前途的动词,但它是一个名词。
我显然遗漏了一些基本的东西。有没有人想将 BulkInferrer 结果存储在数据库中的原因?是否有允许我将结果写入数据库的配置选项?也许我想添加一个 ParquetIO or BigQueryIO instance to the TFX pipeline? (TFX docs say it uses Beam "under the hood" 但这并没有说明我应该如何一起使用它们。)但是这些文档中的语法看起来与我的 TFX 代码完全不同,我不确定它们是否'重新兼容?
帮忙?
(从相关问题中复制以提高可见性)
经过一番挖掘,这里有一个替代方法,它假设事先不了解 feature_spec
。执行以下操作:
- 通过向组件构造添加 output_example_spec,将
BulkInferrer
设置为写入 output_examples
而不是 inference_result
。
- 在
BulkInferrer
之后的主管道中添加一个 StatisticsGen
和一个 SchemaGen
组件,以生成上述 output_examples
的架构
- 使用
SchemaGen
和 BulkInferrer
中的工件读取 TFRecords 并执行任何必要的操作。
bulk_inferrer = BulkInferrer(
....
output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
predict_output=bulk_inferrer_pb2.PredictOutput(
output_columns=[bulk_inferrer_pb2.PredictOutputCol(
output_key='original_label_name',
output_column='output_label_column_name', )]))]
))
statistics = StatisticsGen(
examples=bulk_inferrer.outputs.output_examples
)
schema = SchemaGen(
statistics=statistics.outputs.output,
)
之后,可以进行以下操作:
import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils
# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec
# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')
# parse dataset with spec
def parse(raw_record):
return tf.io.parse_example(raw_record, spec)
dataset = dataset.map(parse)
在这一点上,数据集就像任何其他已解析的数据集一样,因此编写 CSV 或 BigQuery table 或从那里编写的任何东西都是微不足道的。它确实帮助了我们ZenML with our BatchInferencePipeline。
在这里回答我自己的问题以记录我们所做的事情,尽管我认为下面@Hamza Tahir 的回答客观上更好。这可能会为其他需要更改开箱即用 TFX 组件操作的情况提供一个选项。虽然它很老套:
我们复制并编辑了文件 tfx/components/bulk_inferrer/executor.py,在 _run_model_inference()
方法的内部管道中替换了这个转换:
| 'WritePredictionLogs' >> beam.io.WriteToTFRecord(
os.path.join(inference_result.uri, _PREDICTION_LOGS_FILE_NAME),
file_name_suffix='.gz',
coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog)))
这个:
| 'WritePredictionLogsBigquery' >> beam.io.WriteToBigQuery(
'our_project:namespace.TableName',
schema='SCHEMA_AUTODETECT',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://our-storage-bucket/tmp',
temp_file_format='NEWLINE_DELIMITED_JSON',
ignore_insert_ids=True,
)
(这是有效的,因为当您导入 BulkInferrer 组件时,每个节点的工作都会分包给工作节点上的这些执行程序 运行,并且 TFX 将自己的库复制到这些节点上。它不会不过,不要从用户 space 库中复制 所有内容,这就是为什么我们不能只继承 BulkInferrer 并导入我们的自定义版本。)
我们必须确保 'our_project:namespace.TableName'
处的 table 具有与模型输出兼容的模式,但不必将该模式转换为 JSON / AVRO。
从理论上讲,我的团队想用围绕此构建的 TFX 发出拉取请求,但目前我们正在对几个关键参数进行硬编码,没有时间将其变为现实public/生产状态。
我来晚了一点,但这是我用于此任务的一些代码:
import tensorflow as tf
from tensorflow_serving.apis import prediction_log_pb2
import pandas as pd
def parse_prediction_logs(inference_filenames: List[Text]): -> pd.DataFrame
"""
Args:
inference files: tf.io.gfile.glob(Inferrer artifact uri)
Returns:
a dataframe of userids, predictions, and features
"""
def parse_log(pbuf):
# parse the protobuf
message = prediction_log_pb2.PredictionLog()
message.ParseFromString(pbuf)
# my model produces scores and classes and I extract the topK classes
predictions = [x.decode() for x in (message
.predict_log
.response
.outputs['output_2']
.string_val
)[:10]]
# here I parse the input tf.train.Example proto
inputs = tf.train.Example()
inputs.ParseFromString(message
.predict_log
.request
.inputs['input_1'].string_val[0]
)
# you can pull out individual features like this
uid = inputs.features.feature["userId"].bytes_list.value[0].decode()
feature1 = [
x.decode() for x in inputs.features.feature["feature1"].bytes_list.value
]
feature2 = [
x.decode() for x in inputs.features.feature["feature2"].bytes_list.value
]
return (uid, predictions, feature1, feature2)
return pd.DataFrame(
[parse_log(x) for x in
tf.data.TFRecordDataset(inference_filenames, compression_type="GZIP").as_numpy_iterator()
], columns = ["userId", "predictions", "feature1", "feature2"]
)
我是 TFX 的新手,但有一个可以通过 BulkInferrer 使用的明显有效的 ML 管道。这似乎只以 Protobuf 格式生成输出,但由于我是 运行 批量推理,所以我想将结果通过管道传输到数据库。 (数据库输出似乎应该是批量推理的默认输出,因为批量推理和数据库访问都利用了并行化……但 Protobuf 是按记录的序列化格式。)
我想我可以使用类似 Parquet-Avro-Protobuf to do the conversion (though that's in Java and the rest of the pipeline's in Python), or I could write something myself to consume all the protobuf messages one-by-one, convert them into JSON, deserialize the JSON into a list of dicts, and load the dict into a Pandas DataFrame, or store it as a bunch of key-value pairs which I treat like a single-use DB... but that sounds like a lot of work and pain involving parallelization and optimization for a very common use case. The top-level Protobuf message definition is Tensorflow's PredictionLog 的东西。
这 必须 是一个常见的用例,因为 TensorFlowModelAnalytics 的功能类似于 this one 消耗 Pandas 个数据帧。我宁愿能够直接写入数据库(最好是 Google BigQuery)或 Parquet 文件(因为 Parquet / Spark 似乎比 Pandas 更好地并行化),而且,那些看起来像他们应该是常见的用例,但我还没有找到任何例子。也许我使用了错误的搜索词?
我还看了 PredictExtractor, since "extracting predictions" sounds close to what I want... but the official documentation appears silent on how that class is supposed to be used. I thought TFTransformOutput 听起来像一个很有前途的动词,但它是一个名词。
我显然遗漏了一些基本的东西。有没有人想将 BulkInferrer 结果存储在数据库中的原因?是否有允许我将结果写入数据库的配置选项?也许我想添加一个 ParquetIO or BigQueryIO instance to the TFX pipeline? (TFX docs say it uses Beam "under the hood" 但这并没有说明我应该如何一起使用它们。)但是这些文档中的语法看起来与我的 TFX 代码完全不同,我不确定它们是否'重新兼容?
帮忙?
(从相关问题中复制以提高可见性)
经过一番挖掘,这里有一个替代方法,它假设事先不了解 feature_spec
。执行以下操作:
- 通过向组件构造添加 output_example_spec,将
BulkInferrer
设置为写入output_examples
而不是inference_result
。 - 在
BulkInferrer
之后的主管道中添加一个StatisticsGen
和一个SchemaGen
组件,以生成上述output_examples
的架构
- 使用
SchemaGen
和BulkInferrer
中的工件读取 TFRecords 并执行任何必要的操作。
bulk_inferrer = BulkInferrer(
....
output_example_spec=bulk_inferrer_pb2.OutputExampleSpec(
output_columns_spec=[bulk_inferrer_pb2.OutputColumnsSpec(
predict_output=bulk_inferrer_pb2.PredictOutput(
output_columns=[bulk_inferrer_pb2.PredictOutputCol(
output_key='original_label_name',
output_column='output_label_column_name', )]))]
))
statistics = StatisticsGen(
examples=bulk_inferrer.outputs.output_examples
)
schema = SchemaGen(
statistics=statistics.outputs.output,
)
之后,可以进行以下操作:
import tensorflow as tf
from tfx.utils import io_utils
from tensorflow_transform.tf_metadata import schema_utils
# read schema from SchemaGen
schema_path = '/path/to/schemagen/schema.pbtxt'
schema_proto = io_utils.SchemaReader().read(schema_path)
spec = schema_utils.schema_as_feature_spec(schema_proto).feature_spec
# read inferred results
data_files = ['/path/to/bulkinferrer/output_examples/examples/examples-00000-of-00001.gz']
dataset = tf.data.TFRecordDataset(data_files, compression_type='GZIP')
# parse dataset with spec
def parse(raw_record):
return tf.io.parse_example(raw_record, spec)
dataset = dataset.map(parse)
在这一点上,数据集就像任何其他已解析的数据集一样,因此编写 CSV 或 BigQuery table 或从那里编写的任何东西都是微不足道的。它确实帮助了我们ZenML with our BatchInferencePipeline。
在这里回答我自己的问题以记录我们所做的事情,尽管我认为下面@Hamza Tahir 的回答客观上更好。这可能会为其他需要更改开箱即用 TFX 组件操作的情况提供一个选项。虽然它很老套:
我们复制并编辑了文件 tfx/components/bulk_inferrer/executor.py,在 _run_model_inference()
方法的内部管道中替换了这个转换:
| 'WritePredictionLogs' >> beam.io.WriteToTFRecord(
os.path.join(inference_result.uri, _PREDICTION_LOGS_FILE_NAME),
file_name_suffix='.gz',
coder=beam.coders.ProtoCoder(prediction_log_pb2.PredictionLog)))
这个:
| 'WritePredictionLogsBigquery' >> beam.io.WriteToBigQuery(
'our_project:namespace.TableName',
schema='SCHEMA_AUTODETECT',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location='gs://our-storage-bucket/tmp',
temp_file_format='NEWLINE_DELIMITED_JSON',
ignore_insert_ids=True,
)
(这是有效的,因为当您导入 BulkInferrer 组件时,每个节点的工作都会分包给工作节点上的这些执行程序 运行,并且 TFX 将自己的库复制到这些节点上。它不会不过,不要从用户 space 库中复制 所有内容,这就是为什么我们不能只继承 BulkInferrer 并导入我们的自定义版本。)
我们必须确保 'our_project:namespace.TableName'
处的 table 具有与模型输出兼容的模式,但不必将该模式转换为 JSON / AVRO。
从理论上讲,我的团队想用围绕此构建的 TFX 发出拉取请求,但目前我们正在对几个关键参数进行硬编码,没有时间将其变为现实public/生产状态。
我来晚了一点,但这是我用于此任务的一些代码:
import tensorflow as tf
from tensorflow_serving.apis import prediction_log_pb2
import pandas as pd
def parse_prediction_logs(inference_filenames: List[Text]): -> pd.DataFrame
"""
Args:
inference files: tf.io.gfile.glob(Inferrer artifact uri)
Returns:
a dataframe of userids, predictions, and features
"""
def parse_log(pbuf):
# parse the protobuf
message = prediction_log_pb2.PredictionLog()
message.ParseFromString(pbuf)
# my model produces scores and classes and I extract the topK classes
predictions = [x.decode() for x in (message
.predict_log
.response
.outputs['output_2']
.string_val
)[:10]]
# here I parse the input tf.train.Example proto
inputs = tf.train.Example()
inputs.ParseFromString(message
.predict_log
.request
.inputs['input_1'].string_val[0]
)
# you can pull out individual features like this
uid = inputs.features.feature["userId"].bytes_list.value[0].decode()
feature1 = [
x.decode() for x in inputs.features.feature["feature1"].bytes_list.value
]
feature2 = [
x.decode() for x in inputs.features.feature["feature2"].bytes_list.value
]
return (uid, predictions, feature1, feature2)
return pd.DataFrame(
[parse_log(x) for x in
tf.data.TFRecordDataset(inference_filenames, compression_type="GZIP").as_numpy_iterator()
], columns = ["userId", "predictions", "feature1", "feature2"]
)