如何将此 Keras 代码转换为 Chainer 代码? (LSTM 自动编码器)

How to convert this Keras code to Chainer code? (LSTM Autoencoder)

在这里,我有一个用 Keras 编写的 LSTM 自动编码器。我想将代码转换为 Chainer。

import numpy as np
from keras.layers import Input, GRU
from keras.models import Model

input_feat = Input(shape=(30, 2000))
l = GRU( 100, return_sequences=True, activation="tanh", recurrent_activation="hard_sigmoid")(input_feat)
l = GRU(2000, return_sequences=True, activation="tanh", recurrent_activation="hard_sigmoid")(l)
model = Model(input_feat, l)
model.compile(optimizer="RMSprop", loss="mean_squared_error")

feat = np.load("feat.npy")
model.fit(feat, feat[:, ::-1, :], epochs=200, batch_size=250)

feat 是维度为 (269, 30, 2000) 的 numpy。我可以 运行 上面的代码,结果是合理的。我写了下面的 Chainer 代码。

import numpy as np
from chainer import Chain, Variable, optimizers
import chainer.functions as F
import chainer.links as L

class GRUAutoEncoder(Chain):
    def __init__(self):
        super().__init__()
        with self.init_scope():
            self.encode = L.GRU(2000, 100)
            self.decode = L.GRU(100, 2000)

    def __call__(self, h, mode):
        if mode == "encode":
            h = F.tanh(self.encode(h))
            return h 

        if mode == "decode":
            h = F.tanh(self.decode(h))
            return h

    def reset(self):
        self.encode.reset_state()
        self.decode.reset_state()

def main():
    feat = np.load("feat.npy") #(269, 30, 2000)

    gru_autoencoder = GRUAutoEncoder()
    optimizer = optimizers.RMSprop(lr=0.01).setup(gru_autoencoder)

    N = len(feat)
    batch_size = 250
    for epoch in range(200):
        index = np.random.randint(0, N-batch_size+1)
        input_splices = feat[index:index+batch_size] #(250, 30, 2000)
        #Encoding
        input_vector = np.zeros((30, batch_size, 2000), dtype="float32")
        h = []
        for i in range(frame_rate):
            input_vector[i] = input_splices[:, i, :] #(250, 1, 2000)
            tmp = Variable(input_vector[i])
            h.append(gru_autoencoder(tmp, "encode")) #(250, 100)

        #Decoding
        output_vector = []
        for i in range(frame_rate):
            tmp = h[i]
            output_vector.append(gru_autoencoder(tmp, "decode"))

        x = input_vector[0]
        t = output_vector[0]
        for i in range(len(output_vector)):
            x = F.concat((x,input_vector[i]), axis=1)
            t = F.concat((t,output_vector[i]), axis=1)

        loss = F.mean_squared_error(x, t)
        gru_autoencoder.cleargrads()
        loss.backward()
        optimizer.update()
        gru_autoencoder.reset()

if __name__ == "__main__":
    main()

但是上面代码的结果并不合理。我认为 Chainer 代码有问题,但我找不到它在哪里。

在 Keras 代码中,

model.fit(feat, feat[:, ::-1, :])

所以,我尝试在 Chainer 代码中反转 output_vector,

output_vector.reverse()

结果还是不太合理

..注意:此答案是[日语SO]的翻译。(https://ja.whosebug.com/questions/52162/keras%E3%81%AE%E3%82%B3%E3%83%BC%E3%83%89%E3%82%92chainer%E3%81%AB%E6%9B%B8%E3%81%8D%E6%8F%9B%E3%81%88%E3%81%9F%E3%81%84lstm-autoencoder%E3%81%AE%E5%AE%9F%E8%A3%85/52213#52213)

  1. 您应该避免使用 L.GRU 而应该使用 L.NStepGRU,因为对于 L.GRU 您必须编写 "recurrence-aware" 代码。换句话说,您必须对一个时间序列多次应用 L.GRU,因此必须非常小心地对待 "batch"。 L.NStepGRU (with n_layers=1) 包装了批处理,因此用户友好。
  2. L.StepGRU的一个实例有两个输入参数:一个是初始状态,另一个是时间序列列表,组成一个批次。按照惯例,初始状态是None.

因此,您的问题的完整答案如下。

### dataset.py
from chainer.dataset import DatasetMixin

import numpy as np


class MyDataset(DatasetMixin):
    N_SAMPLES = 269
    N_TIMESERIES = 30
    N_DIMS = 2000

    def __init__(self):
        super().__init__()
        self.data = np.random.randn(self.N_SAMPLES, self.N_TIMESERIES, self.N_DIMS) \
            .astype(np.float32)

    def __len__(self):
        return self.N_SAMPLES

    def get_example(self, i):
        return self.data[i, :, :]


### model.py
import chainer
from chainer import links as L
from chainer import functions as F
from chainer.link import Chain


class MyModel(Chain):
    N_IN_CHANNEL = 2000
    N_HIDDEN_CHANNEL = 100
    N_OUT_CHANNEL = 2000

    def __init__(self):
        super().__init__()
        self.encoder = L.NStepGRU(n_layers=1, in_size=self.N_IN_CHANNEL, out_size=self.N_HIDDEN_CHANNEL, dropout=0)
        self.decoder = L.NStepGRU(n_layers=1, in_size=self.N_HIDDEN_CHANNEL, out_size=self.N_OUT_CHANNEL, dropout=0)

    def to_gpu(self, device=None):
        self.encoder.to_gpu(device)
        self.decoder.to_gpu(device)

    def to_cpu(self):
        self.encoder.to_cpu()
        self.decoder.to_cpu()

    @staticmethod
    def flip_list(source_list):
        return [F.flip(source, axis=1) for source in source_list]

    def __call__(self, source_list):
        """
        .. note:
            This implementation makes use of "auto-encoding"
            by avoiding redundant copy in GPU device.
            In the typical implementation, this function should receive
            both of ``source_list`` and ``target_list``.
        """
        target_list = self.flip_list(source_list)
        _, h_list = self.encoder(hx=None, xs=source_list)
        _, predicted_list = self.decoder(hx=None, xs=h_list)
        diff_list = [F.mean_squared_error(target, predicted).reshape((1,)) for target, predicted in zip(target_list, predicted_list)]
        loss = F.sum(F.concat(diff_list, axis=0))

        chainer.report({'loss': loss}, self)

        return loss


### converter.py (referring examples/seq2seq/seq2seq.py)
from chainer.dataset import to_device


def convert(batch, device):
    """
    .. note:
        batch must be list(batch_size) of array
    """
    if device is None:
        return batch
    else:
        return [to_device(device, x) for x in batch]


### train.py
from chainer.iterators import SerialIterator
from chainer.optimizers import RMSprop
from chainer.training.updaters import StandardUpdater
from chainer.training.trainer import Trainer

dataset = MyDataset()

BATCH_SIZE = 32
iterator = SerialIterator(dataset, BATCH_SIZE)

model = MyModel()
optimizer = RMSprop()
optimizer.setup(model)

updater = StandardUpdater(iterator, optimizer, convert, device=0)
trainer = Trainer(updater, (100, 'iteration'))

from chainer.training.extensions import snapshot_object
trainer.extend(snapshot_object(model, "model_iter_{.updater.iteration}"), trigger=(10, 'iteration'))

from chainer.training.extensions import LogReport, PrintReport, ProgressBar
trainer.extend(LogReport(['epoch', 'iteration', 'main/loss'], (1, 'iteration')))
trainer.extend(PrintReport(['epoch', 'iteration', 'main/loss']), trigger=(1, 'iteration'))
trainer.extend(ProgressBar(update_interval=1))

trainer.run()