使用 .fit_generator() 在 keras 中训练 GAN
Training GAN in keras with .fit_generator()
我一直在使用以下训练循环训练类似于 Pix2Pix 的条件 GAN 架构:
for epoch in range(start_epoch, end_epoch):
for batch_i, (input_batch, target_batch) in enumerate(dataLoader.load_batch(batch_size)):
fake_batch= self.generator.predict(input_batch)
d_loss_real = self.discriminator.train_on_batch(target_batch, valid)
d_loss_fake = self.discriminator.train_on_batch(fake_batch, invalid)
d_loss = np.add(d_loss_fake, d_loss_real) * 0.5
g_loss = self.combined.train_on_batch([target_batch, input_batch], [valid, target_batch])
现在这很好用,但效率不高,因为数据加载器很快就会成为时间瓶颈。
我研究了 keras 提供的 .fit_generator() 函数,它允许生成器在工作线程中 运行 并且 运行s 更快。
self.combined.fit_generator(generator=trainLoader,
validation_data=evalLoader
callbacks=[checkpointCallback, historyCallback],
workers=1,
use_multiprocessing=True)
我花了一些时间才发现这是不正确的,我不再单独训练我的生成器和鉴别器并且鉴别器根本没有被训练,因为它在 trainable = False
组合模型,基本上破坏了任何类型的对抗性损失,我还不如用 MSE
.
自己训练我的生成器
现在我的问题是是否有一些解决方法,例如在自定义回调中训练我的鉴别器,它会在每批 .fit_generator() 方法中触发?可以实现创建自定义回调,例如:
class MyCustomCallback(tf.keras.callbacks.Callback):
def on_train_batch_end(self, batch, logs=None):
discriminator.train_on_batch()
另一种可能性是将原始训练循环并行化,但恐怕我现在没有时间这样做。
更新:为此内置了排队器:
tf.keras.utils.SequenceEnqueuer
: SequenceEnqueuer
tf.keras.utils.OrdereEnqueuer
: OrderedEnqueuer
您可以在这个答案中查看使用它们的快速方法:
旧答案:
我正是为此目的创建了这个并行化迭代器。我在训练中使用它;
你是这样使用它的:
for epoch, batchIndex, originalBatchIndex, xAndY in ParallelIterator(
generator,
epochs,
shuffle_bool,
use_on_epoch_end_from_generator_bool,
workers = 8,
queue_size=10):
#loop content
x_train_batch, y_train_batch = xAndY
model.train_on_batch(x_train_batch, y_train_batch)
generator
应该是你的 dataloader
,但它需要是 keras.utils.Sequence
,而不仅仅是收益生成器。
但如果你需要的话,适应起来也不是很复杂。 (我只是不知道它是否会正确并行化,不过,我不知道 yield 循环是否可以正确并行化)
在下面的迭代器定义中,您应该替换:
len(keras_sequence)
与 steps_per_epoch
keras_sequence[i]
与 next(keras_sequence)
use_on_epoch_end = False
这是迭代器定义:
import multiprocessing.dummy as mp
#A generator that wraps a Keras Sequence and simulates a `fit_generator` behavior for custom training loops
#It will also work with any iterator that has `__len__` and `__getitem__`.
def ParallelIterator(keras_sequence, epochs, shuffle, use_on_epoch_end, workers = 4, queue_size = 10):
sourceQueue = mp.Queue() #queue for getting batch indices
batchQueue = mp.Queue(maxsize = queue_size) #queue for getting actual batches
indices = np.arange(len(keras_sequence)) #array of indices to be shuffled
use_on_epoch_end = 'on_epoch_end' in dir(keras_sequence) if use_on_epoch_end == True else False
batchesLeft = 0
# printQueue = mp.Queue() #queue for printing messages
# import threading
# screenLock = threading.Semaphore(value=1)
# totalWorkers= 0
# def printer():
# nonlocal printQueue, printing
# while printing:
# while not printQueue.empty():
# text = printQueue.get(block=True)
# screenLock.acquire()
# print(text)
# screenLock.release()
#fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
def fillSource():
nonlocal batchesLeft
# printQueue.put("Iterator: fill source - source qsize = " + str(sourceQueue.qsize()))
if shuffle == True:
np.random.shuffle(indices)
#puts the indices in the indices queue
batchesLeft += len(indices)
# printQueue.put("Iterator: batches left:" + str(batchesLeft))
for i in indices:
sourceQueue.put(i)
#function that will load batches from the Keras Sequence
def worker():
nonlocal sourceQueue, batchQueue, keras_sequence, batchesLeft
# nonlocal printQueue, totalWorkers
# totalWorkers += 1
# thisWorker = totalWorkers
while True:
# printQueue.put('Worker: ' + str(thisWorker) + ' will try to get item')
index = sourceQueue.get(block = True) #get index from the queue
# printQueue.put('Worker: ' + str(thisWorker) + ' got item ' + str(index) + " - source q size = " + str(sourceQueue.qsize()))
if index is None:
break
item = keras_sequence[index] #get batch from the sequence
batchesLeft -= 1
# printQueue.put('Worker: ' + str(thisWorker) + ' batches left ' + str(batchesLeft))
batchQueue.put((index,item), block=True) #puts batch in the batch queue
# printQueue.put('Worker: ' + str(thisWorker) + ' added item ' + str(index) + ' - queue: ' + str(batchQueue.qsize()))
# printQueue.put("hitting end of worker" + str(thisWorker))
# #printing pool that will print messages from the print queue
# printing = True
# printPool = mp.Pool(1, printer)
#creates the thread pool that will work automatically as we get from the batch queue
pool = mp.Pool(workers, worker)
fillSource() #at this point, data starts being taken and stored in the batchQueue
#generation loop
for epoch in range(epochs):
#if not waiting for epoch end synchronization, always keeps 1 epoch filled ahead
if (use_on_epoch_end == False):
if epoch + 1 < epochs: #only fill if not last epoch
fillSource()
for batch in range(len(keras_sequence)):
#if waiting for epoch end synchronization, wait for workers to have no batches left to get, then call epoch end and fill
if use_on_epoch_end == True:
if batchesLeft == 0:
keras_sequence.on_epoch_end()
if epoch + 1 < epochs: #only fill if not last epoch
fillSource()
else:
batchesLeft = -1 #in the last epoch, prevents from calling epoch end again and again
#yields batches for the outside loop that is using this generator
originalIndex, batchItems = batchQueue.get(block = True)
yield epoch, batch, originalIndex, batchItems
# print("iterator epoch end")
# printQueue.put("closing threads")
#terminating the pool - add None to the queue so any blocked worker gets released
for i in range(workers):
sourceQueue.put(None)
pool.terminate()
pool.close()
pool.join()
# printQueue.put("terminated")
# printing = False
# printPool.terminate()
# printPool.close()
# printPool.join()
del pool,sourceQueue,batchQueue
# del printPool, printQueue
虽然您的问题已经有了解决方案,但我想回答您最初的问题,如果您可以在组合模型内的自定义回调中训练鉴别器。
简单的回答是是。
编译模型(判别器和组合模型)时要小心,并按照此处所述的步骤操作:
https://github.com/keras-team/keras/issues/8585#issuecomment-385729276
调用组合模型拟合或拟合生成器:
combined_model.fit_generator(train_loader, epochs, callbacks=[gan_callback])
gan_callback 是自定义回调 class 覆盖 on_batch_end 您调用的地方(如您所述)
def on_batch_end(self, batch_idx, logs=None):
logs_disc = model_disc.train_on_batch(x, y)
要在回调中获取鉴别器模型,可以在构建时将其作为参数提供,也可以通过继承的 self.model (model.layers) 变量获取。
当您想将损失和指标输出到 tensorboard 时,我认为这个解决方案很优雅。
在 gan_callback 的 on_batch_end 函数中,您手头有两个日志(包含损失值和指标):
- logs_disc 来自判别器
- 来自生成器的日志,它是 on_batch_end()
的参数
根据您的配置,这可能会产生一个可以忽略的警告:
UserWarning: Method on_batch_end() is slow compared to the batch update (0.151899). Check your callbacks.
我一直在使用以下训练循环训练类似于 Pix2Pix 的条件 GAN 架构:
for epoch in range(start_epoch, end_epoch):
for batch_i, (input_batch, target_batch) in enumerate(dataLoader.load_batch(batch_size)):
fake_batch= self.generator.predict(input_batch)
d_loss_real = self.discriminator.train_on_batch(target_batch, valid)
d_loss_fake = self.discriminator.train_on_batch(fake_batch, invalid)
d_loss = np.add(d_loss_fake, d_loss_real) * 0.5
g_loss = self.combined.train_on_batch([target_batch, input_batch], [valid, target_batch])
现在这很好用,但效率不高,因为数据加载器很快就会成为时间瓶颈。 我研究了 keras 提供的 .fit_generator() 函数,它允许生成器在工作线程中 运行 并且 运行s 更快。
self.combined.fit_generator(generator=trainLoader,
validation_data=evalLoader
callbacks=[checkpointCallback, historyCallback],
workers=1,
use_multiprocessing=True)
我花了一些时间才发现这是不正确的,我不再单独训练我的生成器和鉴别器并且鉴别器根本没有被训练,因为它在 trainable = False
组合模型,基本上破坏了任何类型的对抗性损失,我还不如用 MSE
.
现在我的问题是是否有一些解决方法,例如在自定义回调中训练我的鉴别器,它会在每批 .fit_generator() 方法中触发?可以实现创建自定义回调,例如:
class MyCustomCallback(tf.keras.callbacks.Callback):
def on_train_batch_end(self, batch, logs=None):
discriminator.train_on_batch()
另一种可能性是将原始训练循环并行化,但恐怕我现在没有时间这样做。
更新:为此内置了排队器:
tf.keras.utils.SequenceEnqueuer
: SequenceEnqueuertf.keras.utils.OrdereEnqueuer
: OrderedEnqueuer
您可以在这个答案中查看使用它们的快速方法:
旧答案:
我正是为此目的创建了这个并行化迭代器。我在训练中使用它;
你是这样使用它的:
for epoch, batchIndex, originalBatchIndex, xAndY in ParallelIterator(
generator,
epochs,
shuffle_bool,
use_on_epoch_end_from_generator_bool,
workers = 8,
queue_size=10):
#loop content
x_train_batch, y_train_batch = xAndY
model.train_on_batch(x_train_batch, y_train_batch)
generator
应该是你的 dataloader
,但它需要是 keras.utils.Sequence
,而不仅仅是收益生成器。
但如果你需要的话,适应起来也不是很复杂。 (我只是不知道它是否会正确并行化,不过,我不知道 yield 循环是否可以正确并行化)
在下面的迭代器定义中,您应该替换:
len(keras_sequence)
与steps_per_epoch
keras_sequence[i]
与next(keras_sequence)
use_on_epoch_end = False
这是迭代器定义:
import multiprocessing.dummy as mp
#A generator that wraps a Keras Sequence and simulates a `fit_generator` behavior for custom training loops
#It will also work with any iterator that has `__len__` and `__getitem__`.
def ParallelIterator(keras_sequence, epochs, shuffle, use_on_epoch_end, workers = 4, queue_size = 10):
sourceQueue = mp.Queue() #queue for getting batch indices
batchQueue = mp.Queue(maxsize = queue_size) #queue for getting actual batches
indices = np.arange(len(keras_sequence)) #array of indices to be shuffled
use_on_epoch_end = 'on_epoch_end' in dir(keras_sequence) if use_on_epoch_end == True else False
batchesLeft = 0
# printQueue = mp.Queue() #queue for printing messages
# import threading
# screenLock = threading.Semaphore(value=1)
# totalWorkers= 0
# def printer():
# nonlocal printQueue, printing
# while printing:
# while not printQueue.empty():
# text = printQueue.get(block=True)
# screenLock.acquire()
# print(text)
# screenLock.release()
#fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
def fillSource():
nonlocal batchesLeft
# printQueue.put("Iterator: fill source - source qsize = " + str(sourceQueue.qsize()))
if shuffle == True:
np.random.shuffle(indices)
#puts the indices in the indices queue
batchesLeft += len(indices)
# printQueue.put("Iterator: batches left:" + str(batchesLeft))
for i in indices:
sourceQueue.put(i)
#function that will load batches from the Keras Sequence
def worker():
nonlocal sourceQueue, batchQueue, keras_sequence, batchesLeft
# nonlocal printQueue, totalWorkers
# totalWorkers += 1
# thisWorker = totalWorkers
while True:
# printQueue.put('Worker: ' + str(thisWorker) + ' will try to get item')
index = sourceQueue.get(block = True) #get index from the queue
# printQueue.put('Worker: ' + str(thisWorker) + ' got item ' + str(index) + " - source q size = " + str(sourceQueue.qsize()))
if index is None:
break
item = keras_sequence[index] #get batch from the sequence
batchesLeft -= 1
# printQueue.put('Worker: ' + str(thisWorker) + ' batches left ' + str(batchesLeft))
batchQueue.put((index,item), block=True) #puts batch in the batch queue
# printQueue.put('Worker: ' + str(thisWorker) + ' added item ' + str(index) + ' - queue: ' + str(batchQueue.qsize()))
# printQueue.put("hitting end of worker" + str(thisWorker))
# #printing pool that will print messages from the print queue
# printing = True
# printPool = mp.Pool(1, printer)
#creates the thread pool that will work automatically as we get from the batch queue
pool = mp.Pool(workers, worker)
fillSource() #at this point, data starts being taken and stored in the batchQueue
#generation loop
for epoch in range(epochs):
#if not waiting for epoch end synchronization, always keeps 1 epoch filled ahead
if (use_on_epoch_end == False):
if epoch + 1 < epochs: #only fill if not last epoch
fillSource()
for batch in range(len(keras_sequence)):
#if waiting for epoch end synchronization, wait for workers to have no batches left to get, then call epoch end and fill
if use_on_epoch_end == True:
if batchesLeft == 0:
keras_sequence.on_epoch_end()
if epoch + 1 < epochs: #only fill if not last epoch
fillSource()
else:
batchesLeft = -1 #in the last epoch, prevents from calling epoch end again and again
#yields batches for the outside loop that is using this generator
originalIndex, batchItems = batchQueue.get(block = True)
yield epoch, batch, originalIndex, batchItems
# print("iterator epoch end")
# printQueue.put("closing threads")
#terminating the pool - add None to the queue so any blocked worker gets released
for i in range(workers):
sourceQueue.put(None)
pool.terminate()
pool.close()
pool.join()
# printQueue.put("terminated")
# printing = False
# printPool.terminate()
# printPool.close()
# printPool.join()
del pool,sourceQueue,batchQueue
# del printPool, printQueue
虽然您的问题已经有了解决方案,但我想回答您最初的问题,如果您可以在组合模型内的自定义回调中训练鉴别器。
简单的回答是是。
编译模型(判别器和组合模型)时要小心,并按照此处所述的步骤操作: https://github.com/keras-team/keras/issues/8585#issuecomment-385729276
调用组合模型拟合或拟合生成器:
combined_model.fit_generator(train_loader, epochs, callbacks=[gan_callback])
gan_callback 是自定义回调 class 覆盖 on_batch_end 您调用的地方(如您所述)
def on_batch_end(self, batch_idx, logs=None):
logs_disc = model_disc.train_on_batch(x, y)
要在回调中获取鉴别器模型,可以在构建时将其作为参数提供,也可以通过继承的 self.model (model.layers) 变量获取。
当您想将损失和指标输出到 tensorboard 时,我认为这个解决方案很优雅。
在 gan_callback 的 on_batch_end 函数中,您手头有两个日志(包含损失值和指标):
- logs_disc 来自判别器
- 来自生成器的日志,它是 on_batch_end() 的参数
根据您的配置,这可能会产生一个可以忽略的警告:
UserWarning: Method on_batch_end() is slow compared to the batch update (0.151899). Check your callbacks.