Tensor Flow TFX 管道中的图像处理
Image processing in Tensor flow TFX pipelines
我正在尝试建立一个 Tensorflow TFX 管道并 运行 使用 MNIST 数据集。
# Imports
import pandas as pd
import numpy as np
from keras.datasets import mnist
import tensorflow as tf
from tfx import v1 as tfx
import os
from tfx.components import ImportExampleGen
from platform import python_version
python_version() #'3.8.8'
# Load the data - 60,000 training examples and 10,000 testing examples
(train_x, train_y), (test_x, test_y) = mnist.load_data()
设置管道路径
_pipeline_root = './pipeline'
_data_root = './data'
if not os.path.isdir(_pipeline_root) and not os.path.isdir(_data_root):
!mkdir {_pipeline_root}
!mkdir {_data_root}
将数据写入 TF.record 格式并保存在 eval 和 train 目录中。请注意,MNIST 数据以 28x28 的 numpy 数组开始,并转换为字节串以使其能够被编码为 Tf.record.
的一部分
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))): # if value ist tensor
value = value.numpy() # get value of tensor
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def serialize_array(array):
array = tf.io.serialize_tensor(array)
return array
def image_label_to_tf_train(image, label):
image_shape = np.shape(image)
#define the dictionary -- the structure -- of our single example
data = {
'height': _int64_feature(image_shape[0]),
'width': _int64_feature(image_shape[1]),
'raw_image' : _bytes_feature(serialize_array(image)),
'label' : _int64_feature(label)
}
#create an Example, wrapping the single features
return tf.train.Example(features=tf.train.Features(feature=data))
def write_images_to_tfr_short(images, labels, filename:str="images", folder = ""):
if not os.path.isdir(folder):
!mkdir {folder}
filename= folder + "/" + filename+".tfrecords"
writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
count = 0
for index in range(len(images)):
#get the data we want to write
current_image = images[index]
current_label = labels[index]
out = image_label_to_tf_train(image=current_image, label=current_label)
writer.write(out.SerializeToString())
count += 1
writer.close()
print(f"Wrote {count} elements to TFRecord")
return count
下一步是调用使用 preprocessing_fn 的转换组件。此函数应处理所有数据,例如将图像数组除以 255 是标准特征处理。但是图像仍然是字节串,我一辈子都想不出如何将它转回数组。下面是我试过的。
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
# Initialize outputs dictionary
outputs = {}
raw_image_dataset = inputs[_IMAGE_KEY]
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.int64)
return outputs
我收到以下错误:
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-37-7beafa4fe436> in <module>
3 schema=schema_gen.outputs['schema'],
4 module_file=os.path.abspath(_mnist_transform_module))
----> 5 context.run(transform, enable_cache=False)
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
61 # __IPYTHON__ variable is set by IPython, see
62 # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 63 return fn(*args, **kwargs)
64 else:
65 absl.logging.warning(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
181 telemetry_utils.LABEL_TFX_RUNNER: runner_label,
182 }):
--> 183 execution_id = launcher.launch().execution_id
184
185 return execution_result.ExecutionResult(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
198 # be immutable in this context.
199 # output_dict can still be changed, specifically properties.
--> 200 self._run_executor(execution_decision.execution_id,
201 copy.deepcopy(execution_decision.input_dict),
202 execution_decision.output_dict,
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
71 # be immutable in this context.
72 # output_dict can still be changed, specifically properties.
---> 73 executor.Do(
74 copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
581 # remove the `_pip_dependencies` attribute.
582 with udf_utils.TempPipInstallContext(self._pip_dependencies):
--> 583 TransformProcessor().Transform(label_inputs, label_outputs, status_file)
584 logging.debug('Cleaning up temp path %s on executor success', temp_path)
585 io_utils.delete_dir(temp_path)
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
1114 materialization_format = (
1115 transform_paths_file_formats[-1] if materialize_output_paths else None)
-> 1116 self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
1117 stats_options_updater_fn, force_tf_compat_v1,
1118 input_dataset_metadata, transform_output_path,
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, stats_options_updater_fn, force_tf_compat_v1, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, disable_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count, stats_output_paths, make_beam_pipeline_fn)
1496 for dataset in transform_data_list:
1497 infix = 'TransformIndex{}'.format(dataset.index)
-> 1498 (dataset.transformed
1499 | 'EncodeAndSerialize[{}]'.format(infix) >> beam.ParDo(
1500 self._RecordBatchToExamplesFn(transformed_schema_proto))
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
594 try:
595 if not exc_type:
--> 596 self.result = self.run()
597 self.result.wait_until_finish()
598 finally:
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
571 finally:
572 shutil.rmtree(tmpdir)
--> 573 return self.runner.run_pipeline(self, self._options)
574 finally:
575 if not is_in_ipython():
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
197 options.view_as(pipeline_options.ProfilingOptions))
198
--> 199 self._latest_run_result = self.run_via_runner_api(
200 pipeline.to_runner_api(default_environment=self._default_environment))
201 return self._latest_run_result
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
208 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
209 # the teststream (if any), and all the stages).
--> 210 return self.run_stages(stage_context, stages)
211
212 @contextlib.contextmanager
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
393 )
394
--> 395 stage_results = self._run_stage(
396 runner_execution_context, bundle_context_manager)
397
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
658 while True:
659 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660 self._run_bundle(
661 runner_execution_context,
662 bundle_context_manager,
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
781 expected_timer_output)
782
--> 783 result, splits = bundle_manager.process_bundle(
784 data_input, data_output, input_timers, expected_timer_output)
785 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1092 process_bundle_descriptor.id,
1093 cache_tokens=[next(self._cache_token_generator)]))
-> 1094 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1095
1096 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
578 if request_type:
579 # E.g. if register is set, this will call self.register(request.register))
--> 580 return getattr(self, request_type)(
581 getattr(request, request_type), request.instruction_id)
582 else:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
616 with self.maybe_profile(instruction_id):
617 delayed_applications, requests_finalization = (
--> 618 bundle_processor.process_bundle(instruction_id))
619 monitoring_infos = bundle_processor.monitoring_infos()
620 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
993 element.timer_family_id, timer_data)
994 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995 input_op_by_transform_id[element.transform_id].process_encoded(
996 element.data)
997
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
219 decoded_value = self.windowed_coder_impl.decode_from_stream(
220 input_stream, True)
--> 221 self.output(decoded_value)
222
223 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1635 from apache_beam.transforms.util import fn_takes_side_inputs
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
1639 wrapper = lambda x: [fn(x)]
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
661 """
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
665 baseline_analyzers_fingerprint,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
892 if not concrete_transform_fn.graph.get_collection(
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
896 tensor_replacement_map)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
804 evaluate_schema_overrides=True)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
808 concrete_metadata_fn,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
254 tensor_annotations, global_annotations = _get_schema_annotations_v2(
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
258 tensor_ranges,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
299 domains[name] = schema_pb2.IntDomain(
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
303
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
127 if is_evaluation_complete and any(
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
131 'from the batch dimension, all dimensions must have known size'
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size [while running 'Analyze/CreateSavedModel[tf_v2_only]/CreateSavedModel']
我知道标签功能正在运行,因为我可以调用下面的代码并打印出来....
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_mnist_transform_module))
context.run(transform, enable_cache=False)
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Decode the first record and print output
for tfrecord in dataset.take(1):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
print(example)
如果我删除这些行:
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
我打印了
features {
feature {
key: "label"
value {
int64_list {
value: 5
}
}
}
}
这表明我对标签功能所做的工作是有效的,但我真的不知道如何转换图像字节。部分问题是我不完全确定格式是什么,因为它只是一个非常不透明的张量。似乎给出了标签操作,我正在有效地对一列数据进行操作,但同样,无法确定正确的操作或语法
所以我想我使用
解决了这个问题
raw_image_dataset = inputs[_IMAGE_KEY]
raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.decode_image(x[0]) , elems = raw_image_dataset, dtype=tf.uint8)
关于批量输入的数据有些问题,因此需要对其进行映射并使用生成的张量“x[0]”的正确分量,我仍然不能 100% 确定为什么会这样案例,但似乎 运行.
现在我正在为 TFX 苦苦挣扎,因为它不允许我输出与输入内容不同的功能...
对于任何未来的观众来说,这都有效
raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.parse_tensor(x[0], tf.uint8, name=None), elems = raw_image_dataset, fn_output_signature = tf.TensorSpec((28,28),dtype=tf.uint8, name=None), infer_shape = True)
raw_image_dataset = tf.cast(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = raw_image_dataset
我正在尝试建立一个 Tensorflow TFX 管道并 运行 使用 MNIST 数据集。
# Imports
import pandas as pd
import numpy as np
from keras.datasets import mnist
import tensorflow as tf
from tfx import v1 as tfx
import os
from tfx.components import ImportExampleGen
from platform import python_version
python_version() #'3.8.8'
# Load the data - 60,000 training examples and 10,000 testing examples
(train_x, train_y), (test_x, test_y) = mnist.load_data()
设置管道路径
_pipeline_root = './pipeline'
_data_root = './data'
if not os.path.isdir(_pipeline_root) and not os.path.isdir(_data_root):
!mkdir {_pipeline_root}
!mkdir {_data_root}
将数据写入 TF.record 格式并保存在 eval 和 train 目录中。请注意,MNIST 数据以 28x28 的 numpy 数组开始,并转换为字节串以使其能够被编码为 Tf.record.
的一部分
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))): # if value ist tensor
value = value.numpy() # get value of tensor
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def serialize_array(array):
array = tf.io.serialize_tensor(array)
return array
def image_label_to_tf_train(image, label):
image_shape = np.shape(image)
#define the dictionary -- the structure -- of our single example
data = {
'height': _int64_feature(image_shape[0]),
'width': _int64_feature(image_shape[1]),
'raw_image' : _bytes_feature(serialize_array(image)),
'label' : _int64_feature(label)
}
#create an Example, wrapping the single features
return tf.train.Example(features=tf.train.Features(feature=data))
def write_images_to_tfr_short(images, labels, filename:str="images", folder = ""):
if not os.path.isdir(folder):
!mkdir {folder}
filename= folder + "/" + filename+".tfrecords"
writer = tf.io.TFRecordWriter(filename) #create a writer that'll store our data to disk
count = 0
for index in range(len(images)):
#get the data we want to write
current_image = images[index]
current_label = labels[index]
out = image_label_to_tf_train(image=current_image, label=current_label)
writer.write(out.SerializeToString())
count += 1
writer.close()
print(f"Wrote {count} elements to TFRecord")
return count
下一步是调用使用 preprocessing_fn 的转换组件。此函数应处理所有数据,例如将图像数组除以 255 是标准特征处理。但是图像仍然是字节串,我一辈子都想不出如何将它转回数组。下面是我试过的。
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
# Initialize outputs dictionary
outputs = {}
raw_image_dataset = inputs[_IMAGE_KEY]
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
outputs[_LABEL_KEY] = tf.cast(inputs[_LABEL_KEY], tf.int64)
return outputs
我收到以下错误:
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:This output type hint will be ignored and not used for type-checking purposes. Typically, output type hints for a PTransform are single (or nested) types wrapped by a PCollection, PDone, or None. Got: Tuple[Dict[str, Union[NoneType, _Dataset]], Union[Dict[str, Dict[str, PCollection]], NoneType], int] instead.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.8 interpreter.
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
INFO:tensorflow:Assets written to: ./pipeline/Transform/transform_graph/225/.temp_path/tftransform_tmp/26150ae80de847fab932efeb0f0c610f/assets
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
<ipython-input-37-7beafa4fe436> in <module>
3 schema=schema_gen.outputs['schema'],
4 module_file=os.path.abspath(_mnist_transform_module))
----> 5 context.run(transform, enable_cache=False)
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run_if_ipython(*args, **kwargs)
61 # __IPYTHON__ variable is set by IPython, see
62 # https://ipython.org/ipython-doc/rel-0.10.2/html/interactive/reference.html#embedding-ipython.
---> 63 return fn(*args, **kwargs)
64 else:
65 absl.logging.warning(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/experimental/interactive/interactive_context.py in run(self, component, enable_cache, beam_pipeline_args)
181 telemetry_utils.LABEL_TFX_RUNNER: runner_label,
182 }):
--> 183 execution_id = launcher.launch().execution_id
184
185 return execution_result.ExecutionResult(
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/base_component_launcher.py in launch(self)
198 # be immutable in this context.
199 # output_dict can still be changed, specifically properties.
--> 200 self._run_executor(execution_decision.execution_id,
201 copy.deepcopy(execution_decision.input_dict),
202 execution_decision.output_dict,
/opt/conda/lib/python3.8/site-packages/tfx/orchestration/launcher/in_process_component_launcher.py in _run_executor(self, execution_id, input_dict, output_dict, exec_properties)
71 # be immutable in this context.
72 # output_dict can still be changed, specifically properties.
---> 73 executor.Do(
74 copy.deepcopy(input_dict), output_dict, copy.deepcopy(exec_properties))
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Do(self, input_dict, output_dict, exec_properties)
581 # remove the `_pip_dependencies` attribute.
582 with udf_utils.TempPipInstallContext(self._pip_dependencies):
--> 583 TransformProcessor().Transform(label_inputs, label_outputs, status_file)
584 logging.debug('Cleaning up temp path %s on executor success', temp_path)
585 io_utils.delete_dir(temp_path)
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in Transform(***failed resolving arguments***)
1114 materialization_format = (
1115 transform_paths_file_formats[-1] if materialize_output_paths else None)
-> 1116 self._RunBeamImpl(analyze_data_list, transform_data_list, preprocessing_fn,
1117 stats_options_updater_fn, force_tf_compat_v1,
1118 input_dataset_metadata, transform_output_path,
/opt/conda/lib/python3.8/site-packages/tfx/components/transform/executor.py in _RunBeamImpl(self, analyze_data_list, transform_data_list, preprocessing_fn, stats_options_updater_fn, force_tf_compat_v1, input_dataset_metadata, transform_output_path, raw_examples_data_format, temp_path, input_cache_dir, output_cache_dir, disable_statistics, per_set_stats_output_paths, materialization_format, analyze_paths_count, stats_output_paths, make_beam_pipeline_fn)
1496 for dataset in transform_data_list:
1497 infix = 'TransformIndex{}'.format(dataset.index)
-> 1498 (dataset.transformed
1499 | 'EncodeAndSerialize[{}]'.format(infix) >> beam.ParDo(
1500 self._RecordBatchToExamplesFn(transformed_schema_proto))
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
594 try:
595 if not exc_type:
--> 596 self.result = self.run()
597 self.result.wait_until_finish()
598 finally:
/opt/conda/lib/python3.8/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
571 finally:
572 shutil.rmtree(tmpdir)
--> 573 return self.runner.run_pipeline(self, self._options)
574 finally:
575 if not is_in_ipython():
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
129 runner = BundleBasedDirectRunner()
130
--> 131 return runner.run_pipeline(pipeline, options)
132
133
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
197 options.view_as(pipeline_options.ProfilingOptions))
198
--> 199 self._latest_run_result = self.run_via_runner_api(
200 pipeline.to_runner_api(default_environment=self._default_environment))
201 return self._latest_run_result
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
208 # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
209 # the teststream (if any), and all the stages).
--> 210 return self.run_stages(stage_context, stages)
211
212 @contextlib.contextmanager
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
393 )
394
--> 395 stage_results = self._run_stage(
396 runner_execution_context, bundle_context_manager)
397
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
658 while True:
659 last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660 self._run_bundle(
661 runner_execution_context,
662 bundle_context_manager,
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
781 expected_timer_output)
782
--> 783 result, splits = bundle_manager.process_bundle(
784 data_input, data_output, input_timers, expected_timer_output)
785 # Now we collect all the deferred inputs remaining from bundle execution.
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
1092 process_bundle_descriptor.id,
1093 cache_tokens=[next(self._cache_token_generator)]))
-> 1094 result_future = self._worker_handler.control_conn.push(process_bundle_req)
1095
1096 split_results = [] # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
376 self._uid_counter += 1
377 request.instruction_id = 'control_%s' % self._uid_counter
--> 378 response = self.worker.do_instruction(request)
379 return ControlFuture(request.instruction_id, response)
380
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
578 if request_type:
579 # E.g. if register is set, this will call self.register(request.register))
--> 580 return getattr(self, request_type)(
581 getattr(request, request_type), request.instruction_id)
582 else:
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
616 with self.maybe_profile(instruction_id):
617 delayed_applications, requests_finalization = (
--> 618 bundle_processor.process_bundle(instruction_id))
619 monitoring_infos = bundle_processor.monitoring_infos()
620 monitoring_infos.extend(self.state_cache_metrics_fn())
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
993 element.timer_family_id, timer_data)
994 elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995 input_op_by_transform_id[element.transform_id].process_encoded(
996 element.data)
997
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
219 decoded_value = self.windowed_coder_impl.decode_from_stream(
220 input_stream, True)
--> 221 self.output(decoded_value)
222
223 def monitoring_infos(self, transform_id, tag_to_pcollection_id):
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.Operation.output()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common._OutputProcessor.process_outputs()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/worker/operations.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.worker.operations.DoOperation.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.DoFnRunner.process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker.invoke_process()
/opt/conda/lib/python3.8/site-packages/apache_beam/runners/common.cpython-38-x86_64-linux-gnu.so in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window()
/opt/conda/lib/python3.8/site-packages/apache_beam/transforms/core.py in <lambda>(x, *args, **kwargs)
1635 from apache_beam.transforms.util import fn_takes_side_inputs
1636 if fn_takes_side_inputs(fn):
-> 1637 wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
1638 else:
1639 wrapper = lambda x: [fn(x)]
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/beam/impl.py in _create_v2_saved_model(tensor_replacement_map, base_temp_dir, preprocessing_fn, input_signature, baseline_analyzers_fingerprint, output_keys_to_name_map)
661 """
662 saved_model_dir = beam_common.get_unique_temp_path(base_temp_dir)
--> 663 impl_helper.trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn,
664 input_signature, base_temp_dir,
665 baseline_analyzers_fingerprint,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in trace_and_write_v2_saved_model(saved_model_dir, preprocessing_fn, input_signature, base_temp_dir, baseline_analyzers_fingerprint, tensor_replacement_map, output_keys_to_name_map)
892 if not concrete_transform_fn.graph.get_collection(
893 analyzer_nodes.TENSOR_REPLACEMENTS):
--> 894 metadata = _trace_and_get_metadata(concrete_transform_fn, structured_inputs,
895 preprocessing_fn, base_temp_dir,
896 tensor_replacement_map)
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/impl_helper.py in _trace_and_get_metadata(concrete_transform_fn, structured_inputs, preprocessing_fn, base_temp_dir, tensor_replacement_map)
804 evaluate_schema_overrides=True)
805 return dataset_metadata.DatasetMetadata(
--> 806 schema=schema_inference.infer_feature_schema_v2(
807 concrete_transform_fn.structured_outputs,
808 concrete_metadata_fn,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in infer_feature_schema_v2(features, concrete_metadata_fn, evaluate_schema_overrides)
254 tensor_annotations, global_annotations = _get_schema_annotations_v2(
255 metadata)
--> 256 return _infer_feature_schema_common(
257 features,
258 tensor_ranges,
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _infer_feature_schema_common(features, tensor_ranges, feature_annotations, global_annotations, is_evaluation_complete)
299 domains[name] = schema_pb2.IntDomain(
300 min=min_value, max=max_value, is_categorical=True)
--> 301 feature_spec = _feature_spec_from_batched_tensors(features,
302 is_evaluation_complete)
303
/opt/conda/lib/python3.8/site-packages/tensorflow_transform/schema_inference.py in _feature_spec_from_batched_tensors(tensors, is_evaluation_complete)
127 if is_evaluation_complete and any(
128 dim is None for dim in shape.as_list()[1:]):
--> 129 raise ValueError(
130 'Feature {} ({}) had invalid shape {} for FixedLenFeature: apart '
131 'from the batch dimension, all dimensions must have known size'
ValueError: Feature raw_image (Tensor("Identity_1:0", shape=(None, 1, None), dtype=int64)) had invalid shape (None, 1, None) for FixedLenFeature: apart from the batch dimension, all dimensions must have known size [while running 'Analyze/CreateSavedModel[tf_v2_only]/CreateSavedModel']
我知道标签功能正在运行,因为我可以调用下面的代码并打印出来....
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_mnist_transform_module))
context.run(transform, enable_cache=False)
# Get the URI of the output artifact representing the transformed examples
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'Split-train')
# Get the list of files in this directory (all compressed TFRecord files)
tfrecord_filenames = [os.path.join(train_uri, name)
for name in os.listdir(train_uri)]
# Create a `TFRecordDataset` to read these files
dataset = tf.data.TFRecordDataset(tfrecord_filenames, compression_type="GZIP")
# Decode the first record and print output
for tfrecord in dataset.take(1):
serialized_example = tfrecord.numpy()
example = tf.train.Example()
example.ParseFromString(serialized_example)
print(example)
如果我删除这些行:
img = tf.io.decode_raw(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = img
我打印了
features {
feature {
key: "label"
value {
int64_list {
value: 5
}
}
}
}
这表明我对标签功能所做的工作是有效的,但我真的不知道如何转换图像字节。部分问题是我不完全确定格式是什么,因为它只是一个非常不透明的张量。似乎给出了标签操作,我正在有效地对一列数据进行操作,但同样,无法确定正确的操作或语法
所以我想我使用
解决了这个问题raw_image_dataset = inputs[_IMAGE_KEY]
raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.decode_image(x[0]) , elems = raw_image_dataset, dtype=tf.uint8)
关于批量输入的数据有些问题,因此需要对其进行映射并使用生成的张量“x[0]”的正确分量,我仍然不能 100% 确定为什么会这样案例,但似乎 运行.
现在我正在为 TFX 苦苦挣扎,因为它不允许我输出与输入内容不同的功能...
对于任何未来的观众来说,这都有效
raw_image_dataset = tf.map_fn(fn = lambda x : tf.io.parse_tensor(x[0], tf.uint8, name=None), elems = raw_image_dataset, fn_output_signature = tf.TensorSpec((28,28),dtype=tf.uint8, name=None), infer_shape = True)
raw_image_dataset = tf.cast(raw_image_dataset, tf.int64)
outputs[_IMAGE_KEY] = raw_image_dataset