TFX - 如何检查来自 CsvExampleGen 的记录

TFX - How to inspect records from CsvExampleGen

问题

如何检查加载到 TFX CsvExampleGen 中的数据?

CSV

california_housing_train.csv 的前 3 行如下所示。

longitude latitude housing_median_age total_rooms total_bedrooms population households median_income median_house_value
-122.05 37.37 27 3885 661 1537 606 6.6085 344700
-118.3 34.26 43 1510 310 809 277 3.599 176500
-117.81 33.78 27 3589 507 1484 495 5.7934 270500

CsvExampleGen

CSV 已加载到 CsvExampleGen 中。在我的理解中,XXXExampleGen 是生成 tf.Record 个实例,因此我想知道是否有一种方法可以遍历 CsvExampleGen 中的记录。

from tfx.components import (
    CsvExampleGen
)
housing = CsvExampleGen("sample_data/california_housing_train.csv")
housing
----------
CsvExampleGen(
    spec: <tfx.types.standard_component_specs.FileBasedExampleGenSpec object at 0x7fcd90435450>,
    executor_spec: <tfx.dsl.components.base.executor_spec.BeamExecutorSpec object at 0x7fcd90435850>,
    driver_class: <class 'tfx.components.example_gen.driver.FileBasedDriver'>,
    component_id: CsvExampleGen,
    inputs: {},
    outputs: {
        'examples': OutputChannel(artifact_type=Examples,
        producer_component_id=CsvExampleGen,
        output_key=examples,
        additional_properties={},
        additional_custom_properties={})
    }
)

实验

for record in housing.outputs['examples']:
    print(record)

TypeError Traceback(最后一次调用) 在 ----> 1 用于 housing.outputs['examples'] 中的记录: 2 打印(记录)

类型错误:'OutputChannel'对象不可迭代


你有没有机会看看这个 section in tutorials, which explains how to display the artifacts of ExampleGen component. You can modify the code below(Source: TFX Tutorial) 来达到同样的效果。

# Get the URI of the output artifact representing the training examples, which is a directory
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'Split-train')

# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
                      for name in os.listdir(train_uri)]

# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")

# Iterate over the first 3 records and decode them.
for tfrecord in dataset.take(3):
  serialized_example = tfrecord.numpy()
  example = tf.train.Example()
  example.ParseFromString(serialized_example)
  pp.pprint(example)

如果有帮助,请告诉我们。谢谢!