将 .tfrecords 文件拆分为多个 .tfrecords 文件
Split .tfrecords file into many .tfrecords files
有什么方法可以直接将 .tfrecords 文件拆分为多个 .tfrecords 文件,而无需写回每个数据集示例?
您可以使用这样的函数:
import tensorflow as tf
def split_tfrecord(tfrecord_path, split_size):
with tf.Graph().as_default(), tf.Session() as sess:
ds = tf.data.TFRecordDataset(tfrecord_path).batch(split_size)
batch = ds.make_one_shot_iterator().get_next()
part_num = 0
while True:
try:
records = sess.run(batch)
part_path = tfrecord_path + '.{:03d}'.format(part_num)
with tf.python_io.TFRecordWriter(part_path) as writer:
for record in records:
writer.write(record)
part_num += 1
except tf.errors.OutOfRangeError: break
例如,要将文件 my_records.tfrecord
分成多个部分,每个部分包含 100 条记录,您可以这样做:
split_tfrecord(my_records.tfrecord, 100)
这将创建多个较小的记录文件 my_records.tfrecord.000
、my_records.tfrecord.001
等
在 tensorflow 2.0.0 中,这将起作用:
import tensorflow as tf
raw_dataset = tf.data.TFRecordDataset("input_file.tfrecord")
shards = 10
for i in range(shards):
writer = tf.data.experimental.TFRecordWriter(f"output_file-part-{i}.tfrecord")
writer.write(raw_dataset.shard(shards, i))
使用 .batch()
而不是 .shard()
以避免多次迭代数据集
一种更高效的方法(与使用 tf.data.Dataset.shard()
相比)是使用批处理:
import tensorflow as tf
ITEMS_PER_FILE = 100 # Assuming we are saving 100 items per .tfrecord file
raw_dataset = tf.data.TFRecordDataset('in.tfrecord')
batch_idx = 0
for batch in raw_dataset.batch(ITEMS_PER_FILE):
# Converting `batch` back into a `Dataset`, assuming batch is a `tuple` of `tensors`
batch_ds = tf.data.Dataset.from_tensor_slices(tuple([*batch]))
filename = f'out.tfrecord.{batch_idx:03d}'
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(batch_ds)
batch_idx += 1
TensorFlow 非常有效的方法2.x
正如@yongjieyongjie 所提到的,您应该使用 .batch()
而不是 .shard()
以避免根据需要更频繁地迭代数据集。
但是如果你有一个非常大的数据集,对于内存来说太大了,它会失败(但没有错误),只会给你一些文件和原始数据集的一小部分。
首先你应该对你的数据集进行批处理,并使用你希望每个文件拥有的记录数量作为批量大小(我假设你的数据集已经是序列化格式,否则请参阅 here)。
dataset = dataset.batch(ITEMS_PER_FILE)
您接下来要做的是使用生成器来避免 运行 内存不足。
def write_generator():
i = 0
iterator = iter(dataset)
optional = iterator.get_next_as_optional()
while optional.has_value().numpy():
ds = optional.get_value()
optional = iterator.get_next_as_optional()
batch_ds = tf.data.Dataset.from_tensor_slices(ds)
writer = tf.data.experimental.TFRecordWriter(save_to + "\" + name + "-" + str(i) + ".tfrecord", compression_type='GZIP')#compression_type='GZIP'
i += 1
yield batch_ds, writer, i
return
现在只需正常使用生成器即可 for-loop
for data, wri, i in write_generator():
start_time = time.time()
wri.write(data)
print("Time needed: ", time.time() - start_time, "s", "\t", NAME_OF_FILES + "-" + str(i) + ".tfrecord")
只要一个文件适合内存中的原始文件,这应该就可以正常工作。
劈叉不均匀
如果您想平均分割成大小相同的文件,其他大多数答案都适用。这将适用于不均匀的分割:
# `splits` is a list of the number of records you want in each output file
def split_files(filename: str, splits: List[int]) -> None:
dataset: tf.data.Dataset = tf.data.TFRecordDataset(filename)
rec_counter: int = 0
# An extra iteration over the data to get the size
total_records: int = len([r for r in dataset])
print(f"Found {total_records} records in source file.")
if sum(splits) != total_records:
raise ValueError(f"Sum of splits {sum(splits)} does not equal "
f"total number of records {total_records}")
rec_iter:Iterator = iter(dataset)
split: int
for split_idx, split in enumerate(splits):
outfile: str = filename + f".{split_idx}-{split}"
with tf.io.TFRecordWriter(outfile) as writer:
for out_idx in range(split):
rec: tf.Tensor = next(rec_iter, None)
rec_counter +=1
writer.write(rec.numpy())
print(f"Finished writing {split} records to file {split_idx}")
虽然我认为从技术上讲 OP 询问了 without writing back each Dataset example
(这就是它所做的),但这至少是在不反序列化每个示例的情况下进行的。
对于非常大的文件来说有点慢。可能有一种方法可以修改其他一些基于批处理的答案,以便使用批处理输入读取但仍然写入不均匀的拆分,但我没有尝试过。
分成N个splits (在tensorflow 1.13.1中测试)
import os
import hashlib
import tensorflow as tf
from tqdm import tqdm
def split_tfrecord(tfrecord_path, n_splits):
dataset = tf.data.TFRecordDataset(tfrecord_path)
outfiles=[]
for n_split in range(n_splits):
output_tfrecord_dir = f"{os.path.splitext(tfrecord_path)[0]}"
if not os.path.exists(output_tfrecord_dir):
os.makedirs(output_tfrecord_dir)
output_tfrecord_path=os.path.join(output_tfrecord_dir, f"{n_split:03d}.tfrecord")
out_f = tf.io.TFRecordWriter(output_tfrecord_path)
outfiles.append(out_f)
for record in tqdm(dataset):
sample = tf.train.Example()
record = record.numpy()
sample.ParseFromString(record)
idx = int(hashlib.sha1(record).hexdigest(),16) % n_splits
outfiles[idx].write(example.SerializeToString())
for file in outfiles:
file.close()
有什么方法可以直接将 .tfrecords 文件拆分为多个 .tfrecords 文件,而无需写回每个数据集示例?
您可以使用这样的函数:
import tensorflow as tf
def split_tfrecord(tfrecord_path, split_size):
with tf.Graph().as_default(), tf.Session() as sess:
ds = tf.data.TFRecordDataset(tfrecord_path).batch(split_size)
batch = ds.make_one_shot_iterator().get_next()
part_num = 0
while True:
try:
records = sess.run(batch)
part_path = tfrecord_path + '.{:03d}'.format(part_num)
with tf.python_io.TFRecordWriter(part_path) as writer:
for record in records:
writer.write(record)
part_num += 1
except tf.errors.OutOfRangeError: break
例如,要将文件 my_records.tfrecord
分成多个部分,每个部分包含 100 条记录,您可以这样做:
split_tfrecord(my_records.tfrecord, 100)
这将创建多个较小的记录文件 my_records.tfrecord.000
、my_records.tfrecord.001
等
在 tensorflow 2.0.0 中,这将起作用:
import tensorflow as tf
raw_dataset = tf.data.TFRecordDataset("input_file.tfrecord")
shards = 10
for i in range(shards):
writer = tf.data.experimental.TFRecordWriter(f"output_file-part-{i}.tfrecord")
writer.write(raw_dataset.shard(shards, i))
使用 .batch()
而不是 .shard()
以避免多次迭代数据集
一种更高效的方法(与使用 tf.data.Dataset.shard()
相比)是使用批处理:
import tensorflow as tf
ITEMS_PER_FILE = 100 # Assuming we are saving 100 items per .tfrecord file
raw_dataset = tf.data.TFRecordDataset('in.tfrecord')
batch_idx = 0
for batch in raw_dataset.batch(ITEMS_PER_FILE):
# Converting `batch` back into a `Dataset`, assuming batch is a `tuple` of `tensors`
batch_ds = tf.data.Dataset.from_tensor_slices(tuple([*batch]))
filename = f'out.tfrecord.{batch_idx:03d}'
writer = tf.data.experimental.TFRecordWriter(filename)
writer.write(batch_ds)
batch_idx += 1
TensorFlow 非常有效的方法2.x
正如@yongjieyongjie 所提到的,您应该使用 .batch()
而不是 .shard()
以避免根据需要更频繁地迭代数据集。
但是如果你有一个非常大的数据集,对于内存来说太大了,它会失败(但没有错误),只会给你一些文件和原始数据集的一小部分。
首先你应该对你的数据集进行批处理,并使用你希望每个文件拥有的记录数量作为批量大小(我假设你的数据集已经是序列化格式,否则请参阅 here)。
dataset = dataset.batch(ITEMS_PER_FILE)
您接下来要做的是使用生成器来避免 运行 内存不足。
def write_generator():
i = 0
iterator = iter(dataset)
optional = iterator.get_next_as_optional()
while optional.has_value().numpy():
ds = optional.get_value()
optional = iterator.get_next_as_optional()
batch_ds = tf.data.Dataset.from_tensor_slices(ds)
writer = tf.data.experimental.TFRecordWriter(save_to + "\" + name + "-" + str(i) + ".tfrecord", compression_type='GZIP')#compression_type='GZIP'
i += 1
yield batch_ds, writer, i
return
现在只需正常使用生成器即可 for-loop
for data, wri, i in write_generator():
start_time = time.time()
wri.write(data)
print("Time needed: ", time.time() - start_time, "s", "\t", NAME_OF_FILES + "-" + str(i) + ".tfrecord")
只要一个文件适合内存中的原始文件,这应该就可以正常工作。
劈叉不均匀
如果您想平均分割成大小相同的文件,其他大多数答案都适用。这将适用于不均匀的分割:
# `splits` is a list of the number of records you want in each output file
def split_files(filename: str, splits: List[int]) -> None:
dataset: tf.data.Dataset = tf.data.TFRecordDataset(filename)
rec_counter: int = 0
# An extra iteration over the data to get the size
total_records: int = len([r for r in dataset])
print(f"Found {total_records} records in source file.")
if sum(splits) != total_records:
raise ValueError(f"Sum of splits {sum(splits)} does not equal "
f"total number of records {total_records}")
rec_iter:Iterator = iter(dataset)
split: int
for split_idx, split in enumerate(splits):
outfile: str = filename + f".{split_idx}-{split}"
with tf.io.TFRecordWriter(outfile) as writer:
for out_idx in range(split):
rec: tf.Tensor = next(rec_iter, None)
rec_counter +=1
writer.write(rec.numpy())
print(f"Finished writing {split} records to file {split_idx}")
虽然我认为从技术上讲 OP 询问了 without writing back each Dataset example
(这就是它所做的),但这至少是在不反序列化每个示例的情况下进行的。
对于非常大的文件来说有点慢。可能有一种方法可以修改其他一些基于批处理的答案,以便使用批处理输入读取但仍然写入不均匀的拆分,但我没有尝试过。
分成N个splits (在tensorflow 1.13.1中测试)
import os
import hashlib
import tensorflow as tf
from tqdm import tqdm
def split_tfrecord(tfrecord_path, n_splits):
dataset = tf.data.TFRecordDataset(tfrecord_path)
outfiles=[]
for n_split in range(n_splits):
output_tfrecord_dir = f"{os.path.splitext(tfrecord_path)[0]}"
if not os.path.exists(output_tfrecord_dir):
os.makedirs(output_tfrecord_dir)
output_tfrecord_path=os.path.join(output_tfrecord_dir, f"{n_split:03d}.tfrecord")
out_f = tf.io.TFRecordWriter(output_tfrecord_path)
outfiles.append(out_f)
for record in tqdm(dataset):
sample = tf.train.Example()
record = record.numpy()
sample.ParseFromString(record)
idx = int(hashlib.sha1(record).hexdigest(),16) % n_splits
outfiles[idx].write(example.SerializeToString())
for file in outfiles:
file.close()