即使使用种子值,tensorflow tf.data.experimental.rejection_resample 也无法实现目标分布
tensorflow tf.data.experimental.rejection_resample does not achieve target distribution even with a seed value
我正在尝试使用以下代码测试拒绝抽样。
这是我得到的结果:
target_dist [0.5, 0.5]
initial distribution [0.8333333333333334, 0.16666666666666666]
result counts [1500, 600]
final dist 0.7142857142857143 0.2857142857142857
最终的分布不反映我设定的目标分布。
有什么想法吗?
import tensorflow as tf
import numpy as np
# everything is based on tensorflow 2.0
tf.random.set_seed(2342)
def map2label(sample):
return tf.cast(tf.math.equal(sample, 2), tf.int32)
np_data = np.array([0,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2])
target_dist = [0.5, 0.5]
init_dist = [(np_data.shape[0]-3)/np_data.shape[0], 3/np_data.shape[0]]
dataset = tf.data.Dataset.from_tensor_slices(np_data)
rej = tf.data.experimental.rejection_resample(map2label, target_dist, init_dist, 2342) # set seed explicitly
dataset = dataset.apply(rej)
bucket_counts = [0, 0]
for i in range(100):
for data in dataset:
class_id, data_content = data
bucket_counts[class_id.numpy()] += 1
print("This is your target_dist", target_dist, "This is your initial distribution", init_dist)
print("This is your result counts", bucket_counts,
"This is your final dist", bucket_counts[0] / np.sum(bucket_counts), bucket_counts[1] / np.sum(bucket_counts))
这与其说是解决此处拒绝抽样问题的解决方案,不如说是一种变通方法。我使用自己的欠采样算法而不是拒绝采样。它的工作方式不同,但是,它达到了目的。我会把这个问题留在这里,因为拒绝抽样在某些情况下会更有效,所以这对其他人仍然有帮助。
# everything below is tensorflow 2.0
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
from termcolor import colored
seed = 1341
tf.random.set_seed(seed)
np.random.seed(seed)
# generate data and distribution
init_dist = [0.9, 0.1]
num_per_class = np.random.multinomial(10000, init_dist)
np_data = np.hstack((np.zeros(num_per_class[0], dtype=np.int32),
np.ones(num_per_class[1], dtype=np.int32)))
np.random.shuffle(np_data)
batch_size = 40
epochs = 1
target_dist = np.array([0.5, 0.5], dtype=np.float32)
number_of_positives = np.sum(np_data)
number_of_negatives = np_data.shape[0] - number_of_positives
number_of_examples = np_data.shape[0]
all_dataset = tf.data.Dataset.from_tensor_slices(np_data)
def filter(dist, seed=seed):
# create a uniform distribution
tf.random.set_seed(seed)
normal = tfp.distributions.Uniform(low=0, high=1.0)
def _filter(sample):
# explanation: uniform distribution has 60% chance larger than 0.4
# so if you want to draw a sample at 60% chance, your tocken
# needs to be larger than (1 - 60%)
tocken = normal.sample()
return tf.cast(tocken > (1 - dist[tf.cast(sample, tf.int32)]), tf.bool)
return _filter
# Undersampling
drawing_prob = np.min(init_dist) / np.array(init_dist)
dist = tf.constant(drawing_prob, tf.float32)
method2_dataset = all_dataset.filter(filter(dist))
method2_dataset = method2_dataset.batch(batch_size)
# Verify dataset is balanced
def compute_distribution(dataset, epochs, batch_size):
data_receives = []
# count stats to verify both batch distribution and total distribution
bucket_counts = [0, 0]
batch_counts = []
for epoch in range(epochs):
data_receives.append([])
for data_contents in dataset:
batch_count = [0, 0]
for j in range(batch_size):
try:
data_content = data_contents.numpy()[j]
data_receives[epoch].append(data_content)
bucket_counts[data_content] += 1
batch_count[data_content] += 1
except IndexError:
break
batch_counts.append(batch_count)
# Batch distribution
for bid, batch in enumerate(batch_counts):
print("batch %03d, batch count [%02d, %02d],
batch distribution [%02.2f, %02.2f]"
% (bid, batch[0], batch[1], batch[0] / np.sum(batch),
batch[1] / np.sum(batch)))
# Total distribution
print(colored("result counts [%d %d]
final distribution [%02.2f, %02.2f]"
% (bucket_counts[0], bucket_counts[1],
bucket_counts[0] / np.sum(bucket_counts),
bucket_counts[1] / np.sum(bucket_counts)),
"green"))
# check if each epoch produces the same order
for i in range(len(data_receives)-1):
if not np.array_equal(np.array(data_receives[i]),
np.array(data_receives[i+1])):
raise ValueError("Order not preserved.")
if epochs > 1:
print(colored("Yes, order is preserved.", "green"))
compute_distribution(method2_dataset, epochs, batch_size)
以下更改使代码对我有用:
无限次重复数据集(或以很大的倍数重复,例如 1000)
通过使用:shuffle = tf.data.experimental.shuffle_and_repeat(1, count=1000)
使用以下方法将无限集剪回所需的长度:dataset = dataset.take(30)
为了获得好的结果,我必须至少采集 30 个样本。看起来像原来的
np_data 的 18 个样本根本不够大。
删除 init_dist 显着改善了平衡
target_dist [0.5, 0.5]
initial distribution [0.8333333333333334, 0.16666666666666666]
result counts [1500, 1500]
final dist 0.5 0.5
import tensorflow as tf
import numpy as np
# everything is based on tensorflow 2.0
tf.random.set_seed(2342)
def map2label(sample):
return tf.cast(tf.math.equal(sample, 2), tf.int32)
np_data = np.array([0, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2])
target_dist = [0.5, 0.5]
init_dist = [(np_data.shape[0] - 3) / np_data.shape[0], 3 / np_data.shape[0]]
dataset = tf.data.Dataset.from_tensor_slices(np_data)
shuffle = tf.data.experimental.shuffle_and_repeat(1, count=1000)
rej = tf.data.experimental.rejection_resample(map2label, target_dist, seed=2342) # set seed explicitly
dataset = dataset.apply(shuffle).apply(rej)
dataset = dataset.take(30)
bucket_counts = [0, 0]
for i in range(100):
for data in dataset:
class_id, data_content = data
bucket_counts[class_id.numpy()] += 1
print("This is your target_dist", target_dist, "This is your initial distribution", init_dist)
print("This is your result counts", bucket_counts,
"This is your final dist", bucket_counts[0] / np.sum(bucket_counts), bucket_counts[1] / np.sum(bucket_counts))
好的,这就是我的做法
nn = 0
for _ in ds: nn +=1
ds_per_class = [ds.repeat().filter(lambda inp,tar: tar==kk) for kk in range(class_nn)]
ds = tf.data.experimental.sample_from_datasets(ds_per_class).take(nn)
关键是通过 class 过滤数据集列表,然后使用 sample_from_dataset 获得统一发行版,这需要对无限数据集进行操作,因此我们做 repeat()之前
如果我们想在 model.fit() 中仍然保持一致,我们需要获取初始 ds.len nn 来执行 take(nn) 以保持原始纪元大小 - 但否则没有必要如果我们不关心 epoch len(例如可以说 take(1000))。
我正在尝试使用以下代码测试拒绝抽样。 这是我得到的结果:
target_dist [0.5, 0.5]
initial distribution [0.8333333333333334, 0.16666666666666666]
result counts [1500, 600]
final dist 0.7142857142857143 0.2857142857142857
最终的分布不反映我设定的目标分布。
有什么想法吗?
import tensorflow as tf
import numpy as np
# everything is based on tensorflow 2.0
tf.random.set_seed(2342)
def map2label(sample):
return tf.cast(tf.math.equal(sample, 2), tf.int32)
np_data = np.array([0,2,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,2])
target_dist = [0.5, 0.5]
init_dist = [(np_data.shape[0]-3)/np_data.shape[0], 3/np_data.shape[0]]
dataset = tf.data.Dataset.from_tensor_slices(np_data)
rej = tf.data.experimental.rejection_resample(map2label, target_dist, init_dist, 2342) # set seed explicitly
dataset = dataset.apply(rej)
bucket_counts = [0, 0]
for i in range(100):
for data in dataset:
class_id, data_content = data
bucket_counts[class_id.numpy()] += 1
print("This is your target_dist", target_dist, "This is your initial distribution", init_dist)
print("This is your result counts", bucket_counts,
"This is your final dist", bucket_counts[0] / np.sum(bucket_counts), bucket_counts[1] / np.sum(bucket_counts))
这与其说是解决此处拒绝抽样问题的解决方案,不如说是一种变通方法。我使用自己的欠采样算法而不是拒绝采样。它的工作方式不同,但是,它达到了目的。我会把这个问题留在这里,因为拒绝抽样在某些情况下会更有效,所以这对其他人仍然有帮助。
# everything below is tensorflow 2.0
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np
from termcolor import colored
seed = 1341
tf.random.set_seed(seed)
np.random.seed(seed)
# generate data and distribution
init_dist = [0.9, 0.1]
num_per_class = np.random.multinomial(10000, init_dist)
np_data = np.hstack((np.zeros(num_per_class[0], dtype=np.int32),
np.ones(num_per_class[1], dtype=np.int32)))
np.random.shuffle(np_data)
batch_size = 40
epochs = 1
target_dist = np.array([0.5, 0.5], dtype=np.float32)
number_of_positives = np.sum(np_data)
number_of_negatives = np_data.shape[0] - number_of_positives
number_of_examples = np_data.shape[0]
all_dataset = tf.data.Dataset.from_tensor_slices(np_data)
def filter(dist, seed=seed):
# create a uniform distribution
tf.random.set_seed(seed)
normal = tfp.distributions.Uniform(low=0, high=1.0)
def _filter(sample):
# explanation: uniform distribution has 60% chance larger than 0.4
# so if you want to draw a sample at 60% chance, your tocken
# needs to be larger than (1 - 60%)
tocken = normal.sample()
return tf.cast(tocken > (1 - dist[tf.cast(sample, tf.int32)]), tf.bool)
return _filter
# Undersampling
drawing_prob = np.min(init_dist) / np.array(init_dist)
dist = tf.constant(drawing_prob, tf.float32)
method2_dataset = all_dataset.filter(filter(dist))
method2_dataset = method2_dataset.batch(batch_size)
# Verify dataset is balanced
def compute_distribution(dataset, epochs, batch_size):
data_receives = []
# count stats to verify both batch distribution and total distribution
bucket_counts = [0, 0]
batch_counts = []
for epoch in range(epochs):
data_receives.append([])
for data_contents in dataset:
batch_count = [0, 0]
for j in range(batch_size):
try:
data_content = data_contents.numpy()[j]
data_receives[epoch].append(data_content)
bucket_counts[data_content] += 1
batch_count[data_content] += 1
except IndexError:
break
batch_counts.append(batch_count)
# Batch distribution
for bid, batch in enumerate(batch_counts):
print("batch %03d, batch count [%02d, %02d],
batch distribution [%02.2f, %02.2f]"
% (bid, batch[0], batch[1], batch[0] / np.sum(batch),
batch[1] / np.sum(batch)))
# Total distribution
print(colored("result counts [%d %d]
final distribution [%02.2f, %02.2f]"
% (bucket_counts[0], bucket_counts[1],
bucket_counts[0] / np.sum(bucket_counts),
bucket_counts[1] / np.sum(bucket_counts)),
"green"))
# check if each epoch produces the same order
for i in range(len(data_receives)-1):
if not np.array_equal(np.array(data_receives[i]),
np.array(data_receives[i+1])):
raise ValueError("Order not preserved.")
if epochs > 1:
print(colored("Yes, order is preserved.", "green"))
compute_distribution(method2_dataset, epochs, batch_size)
以下更改使代码对我有用:
无限次重复数据集(或以很大的倍数重复,例如 1000) 通过使用:shuffle = tf.data.experimental.shuffle_and_repeat(1, count=1000)
使用以下方法将无限集剪回所需的长度:dataset = dataset.take(30) 为了获得好的结果,我必须至少采集 30 个样本。看起来像原来的 np_data 的 18 个样本根本不够大。
删除 init_dist 显着改善了平衡
target_dist [0.5, 0.5]
initial distribution [0.8333333333333334, 0.16666666666666666]
result counts [1500, 1500]
final dist 0.5 0.5
import tensorflow as tf
import numpy as np
# everything is based on tensorflow 2.0
tf.random.set_seed(2342)
def map2label(sample):
return tf.cast(tf.math.equal(sample, 2), tf.int32)
np_data = np.array([0, 2, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2])
target_dist = [0.5, 0.5]
init_dist = [(np_data.shape[0] - 3) / np_data.shape[0], 3 / np_data.shape[0]]
dataset = tf.data.Dataset.from_tensor_slices(np_data)
shuffle = tf.data.experimental.shuffle_and_repeat(1, count=1000)
rej = tf.data.experimental.rejection_resample(map2label, target_dist, seed=2342) # set seed explicitly
dataset = dataset.apply(shuffle).apply(rej)
dataset = dataset.take(30)
bucket_counts = [0, 0]
for i in range(100):
for data in dataset:
class_id, data_content = data
bucket_counts[class_id.numpy()] += 1
print("This is your target_dist", target_dist, "This is your initial distribution", init_dist)
print("This is your result counts", bucket_counts,
"This is your final dist", bucket_counts[0] / np.sum(bucket_counts), bucket_counts[1] / np.sum(bucket_counts))
好的,这就是我的做法
nn = 0
for _ in ds: nn +=1
ds_per_class = [ds.repeat().filter(lambda inp,tar: tar==kk) for kk in range(class_nn)]
ds = tf.data.experimental.sample_from_datasets(ds_per_class).take(nn)
关键是通过 class 过滤数据集列表,然后使用 sample_from_dataset 获得统一发行版,这需要对无限数据集进行操作,因此我们做 repeat()之前
如果我们想在 model.fit() 中仍然保持一致,我们需要获取初始 ds.len nn 来执行 take(nn) 以保持原始纪元大小 - 但否则没有必要如果我们不关心 epoch len(例如可以说 take(1000))。