如何使用张量流或火花拆分高度不平衡的数据?
how to split highly imbalanced data using tensor flow or spark?
数据-
我拥有的训练和测试数据非常大~ 150gb并且高度不平衡 99% neg labels/ 1% pos labels,我不能下采样,因为它也是一个非常重要的信息,所以目前使用加权估计器。
问题-
如果我们使用 spark 方法使用 sample() 函数进行拆分并保存到多个文件,那么很可能只有一个文件中有一个负样本(比如 100 个文件中有一个),这会导致摄取数据时出现问题,因为只有正样本被提供给导致零损失和模型无法学习的估计器。
此外,我在制作批处理时确实使用了 shuffle,但输入函数将多个文件作为输入,因此批处理是通过从每个文件中的数据进行混洗来创建的,这导致模型在非常非常非常长的时间内仅提供否定案例很长一段时间,直到对具有负样本的文件进行随机播放。
有没有更好的方法来确保在使用 pyspark 保存数据时,spark 保存的每个文件都有来自 both classes/labels(最好与整体数据pos/neg的比例相同)?
我曾尝试使用一个大文件进行馈送,在这些情况下随机播放效果很好,但是当我们馈送很多文件时,它会产生零丢失的问题,因为只有来自一个 class 的样本被馈送到型号。
在tensorflow代码中使用以下输入函数-
def csv_input_fn(files_name_pattern, mode=tf.estimator.ModeKeys.EVAL,
skip_header_lines=0,
num_epochs=None,
batch_size=1000):
shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False
num_threads = multiprocessing.cpu_count() if MULTI_THREADING else 1
print("")
print("* data input_fn:")
print("================")
print("Input file(s): {}".format(files_name_pattern))
print("Batch size: {}".format(batch_size))
print("Epoch Count: {}".format(num_epochs))
print("Mode: {}".format(mode))
print("Thread Count: {}".format(num_threads))
print("Shuffle: {}".format(shuffle))
print("================")
print("")
file_names = tf.matching_files(files_name_pattern)
dataset = data.TextLineDataset(filenames=file_names)
dataset = dataset.skip(skip_header_lines)
if shuffle:
dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)
dataset = dataset.batch(batch_size)
dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row),
num_parallel_calls=num_threads)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()
return features, target
如有任何建议,我们将不胜感激!谢谢
找到了我自己问题的答案,因此您可以将 buffer_size 更改为完整数据集中 elements/rows 的数量,这样我们就可以确保索引是使用洗牌随机分配将是统一的,因为现在洗牌是使用整个数据集完成的。
代码已更改 -
if shuffle:
dataset = dataset.shuffle(buffer_size='total training instances size')
数据-
我拥有的训练和测试数据非常大~ 150gb并且高度不平衡 99% neg labels/ 1% pos labels,我不能下采样,因为它也是一个非常重要的信息,所以目前使用加权估计器。
问题-
如果我们使用 spark 方法使用 sample() 函数进行拆分并保存到多个文件,那么很可能只有一个文件中有一个负样本(比如 100 个文件中有一个),这会导致摄取数据时出现问题,因为只有正样本被提供给导致零损失和模型无法学习的估计器。
此外,我在制作批处理时确实使用了 shuffle,但输入函数将多个文件作为输入,因此批处理是通过从每个文件中的数据进行混洗来创建的,这导致模型在非常非常非常长的时间内仅提供否定案例很长一段时间,直到对具有负样本的文件进行随机播放。
有没有更好的方法来确保在使用 pyspark 保存数据时,spark 保存的每个文件都有来自 both classes/labels(最好与整体数据pos/neg的比例相同)?
我曾尝试使用一个大文件进行馈送,在这些情况下随机播放效果很好,但是当我们馈送很多文件时,它会产生零丢失的问题,因为只有来自一个 class 的样本被馈送到型号。
在tensorflow代码中使用以下输入函数-
def csv_input_fn(files_name_pattern, mode=tf.estimator.ModeKeys.EVAL,
skip_header_lines=0,
num_epochs=None,
batch_size=1000):
shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False
num_threads = multiprocessing.cpu_count() if MULTI_THREADING else 1
print("")
print("* data input_fn:")
print("================")
print("Input file(s): {}".format(files_name_pattern))
print("Batch size: {}".format(batch_size))
print("Epoch Count: {}".format(num_epochs))
print("Mode: {}".format(mode))
print("Thread Count: {}".format(num_threads))
print("Shuffle: {}".format(shuffle))
print("================")
print("")
file_names = tf.matching_files(files_name_pattern)
dataset = data.TextLineDataset(filenames=file_names)
dataset = dataset.skip(skip_header_lines)
if shuffle:
dataset = dataset.shuffle(buffer_size=2 * batch_size + 1)
dataset = dataset.batch(batch_size)
dataset = dataset.map(lambda csv_row: parse_csv_row(csv_row),
num_parallel_calls=num_threads)
dataset = dataset.repeat(num_epochs)
iterator = dataset.make_one_shot_iterator()
features, target = iterator.get_next()
return features, target
如有任何建议,我们将不胜感激!谢谢
找到了我自己问题的答案,因此您可以将 buffer_size 更改为完整数据集中 elements/rows 的数量,这样我们就可以确保索引是使用洗牌随机分配将是统一的,因为现在洗牌是使用整个数据集完成的。
代码已更改 -
if shuffle:
dataset = dataset.shuffle(buffer_size='total training instances size')