在没有安装 pytorch 的情况下使用 RNN 训练模型

Using RNN Trained Model without pytorch installed

我用 pytorch 训练了一个 RNN 模型。由于 glibc 存在一些奇怪的依赖性问题,我需要在无法安装 pytorch 的环境中使用该模型进行预测。但是,我可以安装 numpy 和 scipy 等库。所以,我想使用经过训练的模型,带有网络定义,没有 pytorch。

当我将模型及其状态字典和权重保存在 the standard way 中时,我有模型的权重,但我也可以仅使用 json/pickle 文件或类似文件来保存它。

我还有网络定义,它在很多方面都依赖于 pytorch。这是我的 RNN 网络定义。

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random

torch.manual_seed(1)
random.seed(1)
device = torch.device('cpu')

class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size,num_layers, matching_in_out=False, batch_size=1):
    super(RNN, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.num_layers = num_layers
    self.batch_size = batch_size
    self.matching_in_out = matching_in_out #length of input vector matches the length of output vector 
    self.lstm = nn.LSTM(input_size, hidden_size,num_layers)
    self.hidden2out = nn.Linear(hidden_size, output_size)
    self.hidden = self.init_hidden()
  def forward(self, feature_list):
    feature_list=torch.tensor(feature_list)
    
    if self.matching_in_out:
      lstm_out, _ = self.lstm( feature_list.view(len( feature_list), 1, -1))
      output_space = self.hidden2out(lstm_out.view(len( feature_list), -1))
      output_scores = torch.sigmoid(output_space) #we'll need to check if we need this sigmoid
      return output_scores #output_scores
    else:
      for i in range(len(feature_list)):
        cur_ft_tensor=feature_list[i]#.view([1,1,self.input_size])
        cur_ft_tensor=cur_ft_tensor.view([1,1,self.input_size])
        lstm_out, self.hidden = self.lstm(cur_ft_tensor, self.hidden)
        outs=self.hidden2out(lstm_out)
      return outs
  def init_hidden(self):
    #return torch.rand(self.num_layers, self.batch_size, self.hidden_size)
    return (torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device),
            torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device))

我知道 ,但我愿意尽可能降低级别。我可以使用 numpy 数组而不是张量,重塑而不是视图,而且我不需要设备设置。

根据上面的 class 定义,我在这里可以看到,我只需要 torch 中的以下组件即可从 forward 函数获取输出:

我觉得我可以轻松implement the sigmoid function using numpy。但是,我可以使用不涉及 pytorch 的东西实现 nn.LSTM 和 nn.Linear 吗?另外,我将如何使用状态字典中的权重到新的 class?

所以,问题是,我如何将这个 RNN 定义“翻译”成不需要 pytorch 的 class,以及如何为其使用状态字典权重? 或者,是否有 pytorch 的“轻型”版本,我可以只使用它来 运行 模型并产生结果?

编辑

我认为为 nn.LSTMnn.linear[= 包含等效的 numpy/scipy 可能会有用46=]。它将帮助我们比较相同代码的 numpy 输出与 torch 输出,并为我们提供一些模块化 code/functions 以供使用。具体来说,以下的 numpy 等价物会很棒:

rnn = nn.LSTM(10, 20, 2)
input = torch.randn(5, 3, 10)
h0 = torch.randn(2, 3, 20)
c0 = torch.randn(2, 3, 20)
output, (hn, cn) = rnn(input, (h0, c0))

还有线性的:

m = nn.Linear(20, 30)
input = torch.randn(128, 20)
output = m(input)

您应该尝试使用 torch.onnx 导出模型。该页面为您提供了一个示例,您可以从中着手。

另一种方法是使用 TorchScript,但这需要火炬库。

这两个都可以是运行而不是python。您可以在 C++ 应用程序中加载 torchscript https://pytorch.org/tutorials/advanced/cpp_export.html

ONNX 更便携,您可以使用 C#、Java 或 Javascript 等语言 https://onnxruntime.ai/(即使在浏览器上)

一个运行宁的例子

只需稍微修改一下您的示例即可解决我发现的错误

请注意,通过跟踪任何 if/elif/else,for,while 将被展开

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import random

torch.manual_seed(1)
random.seed(1)
device = torch.device('cpu')

class RNN(nn.Module):
  def __init__(self, input_size, hidden_size, output_size,num_layers, matching_in_out=False, batch_size=1):
    super(RNN, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.output_size = output_size
    self.num_layers = num_layers
    self.batch_size = batch_size
    self.matching_in_out = matching_in_out #length of input vector matches the length of output vector 
    self.lstm = nn.LSTM(input_size, hidden_size,num_layers)
    self.hidden2out = nn.Linear(hidden_size, output_size)
  def forward(self, x, h0, c0):
    lstm_out, (hidden_a, hidden_b) = self.lstm(x, (h0, c0))
    outs=self.hidden2out(lstm_out)
    return outs, (hidden_a, hidden_b)
  def init_hidden(self):
    #return torch.rand(self.num_layers, self.batch_size, self.hidden_size)
    return (torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device).detach(),
            torch.rand(self.num_layers, self.batch_size, self.hidden_size).to(device).detach())

# convert the arguments passed during onnx.export call
class MWrapper(nn.Module):
    def __init__(self, model):
        super(MWrapper, self).__init__()
        self.model = model;
    def forward(self, kwargs):
        return self.model(**kwargs)

运行一个例子

rnn = RNN(10, 10, 10, 3)
X = torch.randn(3,1,10)
h0,c0  = rnn.init_hidden()
print(rnn(X, h0, c0)[0])

使用相同的输入来跟踪模型并导出 onnx 文件


torch.onnx.export(MWrapper(rnn), {'x':X,'h0':h0,'c0':c0}, 'rnn.onnx', 
                  dynamic_axes={'x':{1:'N'},
                               'c0':{1: 'N'},
                               'h0':{1: 'N'}
                               },
                  input_names=['x', 'h0', 'c0'],
                  output_names=['y', 'hn', 'cn']
                 )

请注意,您可以对某些输入的某些轴的维度使用符号值。未指定的尺寸将固定为跟踪输入的值。默认情况下 LSTM 使用维度 1 作为批次。

接下来我们加载 ONNX 模型并传递相同的输入

import onnxruntime
ort_model = onnxruntime.InferenceSession('rnn.onnx')
print(ort_model.run(['y'], {'x':X.numpy(), 'c0':c0.numpy(), 'h0':h0.numpy()}))

基本上在 numpy 中实现它并从你的 pytorch 模型中复制权重可以做到这一点。对于你的用例,你只需要做一个前向传递,所以我们只需要实现它

#Set Parameters for a small LSTM network
input_size  = 2 # size of one 'event', or sample, in our batch of data
hidden_dim  = 3 # 3 cells in the LSTM layer
output_size = 1 # desired model output

num_layers=3
torch_lstm = RNN( input_size, 
                 hidden_dim ,
                 output_size,
                 num_layers,
                 matching_in_out=True
                 )

state = torch_lstm.state_dict() # state will capture the weights of your model

现在,对于 numpy 中的 LSTM,将使用这些函数: 从这个 link 得到以下代码:https://towardsdatascience.com/the-lstm-reference-card-6163ca98ae87

### NOT MY CODE
import numpy as np 
from scipy.special import expit as sigmoid

def forget_gate(x, h, Weights_hf, Bias_hf, Weights_xf, Bias_xf, prev_cell_state):
    forget_hidden  = np.dot(Weights_hf, h) + Bias_hf
    forget_eventx  = np.dot(Weights_xf, x) + Bias_xf
    return np.multiply( sigmoid(forget_hidden + forget_eventx), prev_cell_state )

def input_gate(x, h, Weights_hi, Bias_hi, Weights_xi, Bias_xi, Weights_hl, Bias_hl, Weights_xl, Bias_xl):
    ignore_hidden  = np.dot(Weights_hi, h) + Bias_hi
    ignore_eventx  = np.dot(Weights_xi, x) + Bias_xi
    learn_hidden   = np.dot(Weights_hl, h) + Bias_hl
    learn_eventx   = np.dot(Weights_xl, x) + Bias_xl
    return np.multiply( sigmoid(ignore_eventx + ignore_hidden), np.tanh(learn_eventx + learn_hidden) )


def cell_state(forget_gate_output, input_gate_output):
    return forget_gate_output + input_gate_output

  
def output_gate(x, h, Weights_ho, Bias_ho, Weights_xo, Bias_xo, cell_state):
    out_hidden = np.dot(Weights_ho, h) + Bias_ho
    out_eventx = np.dot(Weights_xo, x) + Bias_xo
    return np.multiply( sigmoid(out_eventx + out_hidden), np.tanh(cell_state) )

我们还需要 sigmoid 函数,所以

def sigmoid(x):
    return 1/(1 + np.exp(-x))

因为 pytorch 以堆叠方式存储权重,所以我们需要将其分解,因此我们需要以下函数

def get_slices(hidden_dim):
    slices=[]
    breaker=(hidden_dim*4)
    slices=[[i,i+3] for i in range(0, breaker, breaker//4)]
    return slices

现在我们已经为 lstm 准备好了函数,现在我们创建一个 lstm class 来从 pytorch class 复制权重并从中获取输出。

class numpy_lstm:
    def __init__( self, layer_num=0, hidden_dim=1, matching_in_out=False):
        self.matching_in_out=matching_in_out
        self.layer_num=layer_num
        self.hidden_dim=hidden_dim
        
    def init_weights_from_pytorch(self, state):
        slices=get_slices(self.hidden_dim)
        print (slices)

        #Event (x) Weights and Biases for all gates
        
        lstm_weight_ih='lstm.weight_ih_l'+str(self.layer_num)
        self.Weights_xi = state[lstm_weight_ih][slices[0][0]:slices[0][1]].numpy()  # shape  [h, x]
        self.Weights_xf = state[lstm_weight_ih][slices[1][0]:slices[1][1]].numpy()  # shape  [h, x]
        self.Weights_xl = state[lstm_weight_ih][slices[2][0]:slices[2][1]].numpy()  # shape  [h, x]
        self.Weights_xo = state[lstm_weight_ih][slices[3][0]:slices[3][1]].numpy() # shape  [h, x]

        
        lstm_bias_ih='lstm.bias_ih_l'+str(self.layer_num)
        self.Bias_xi = state[lstm_bias_ih][slices[0][0]:slices[0][1]].numpy()  #shape is [h, 1]
        self.Bias_xf = state[lstm_bias_ih][slices[1][0]:slices[1][1]].numpy()  #shape is [h, 1]
        self.Bias_xl = state[lstm_bias_ih][slices[2][0]:slices[2][1]].numpy()  #shape is [h, 1]
        self.Bias_xo = state[lstm_bias_ih][slices[3][0]:slices[3][1]].numpy() #shape is [h, 1]
        
        
        lstm_weight_hh='lstm.weight_hh_l'+str(self.layer_num)

        #Hidden state (h) Weights and Biases for all gates
        self.Weights_hi = state[lstm_weight_hh][slices[0][0]:slices[0][1]].numpy()  #shape is [h, h]
        self.Weights_hf = state[lstm_weight_hh][slices[1][0]:slices[1][1]].numpy()  #shape is [h, h]
        self.Weights_hl = state[lstm_weight_hh][slices[2][0]:slices[2][1]].numpy()  #shape is [h, h]
        self.Weights_ho = state[lstm_weight_hh][slices[3][0]:slices[3][1]].numpy() #shape is [h, h]
        
        
        lstm_bias_hh='lstm.bias_hh_l'+str(self.layer_num)

        self.Bias_hi = state[lstm_bias_hh][slices[0][0]:slices[0][1]].numpy()  #shape is [h, 1]
        self.Bias_hf = state[lstm_bias_hh][slices[1][0]:slices[1][1]].numpy()  #shape is [h, 1]
        self.Bias_hl = state[lstm_bias_hh][slices[2][0]:slices[2][1]].numpy()  #shape is [h, 1]
        self.Bias_ho = state[lstm_bias_hh][slices[3][0]:slices[3][1]].numpy() #shape is [h, 1]
    def forward_lstm_pass(self,input_data):
        h = np.zeros(self.hidden_dim)
        c = np.zeros(self.hidden_dim)
        
        output_list=[]
        for eventx in input_data:
            f = forget_gate(eventx, h, self.Weights_hf, self.Bias_hf, self.Weights_xf, self.Bias_xf, c)
            i =  input_gate(eventx, h, self.Weights_hi, self.Bias_hi, self.Weights_xi, self.Bias_xi, 
                        self.Weights_hl, self.Bias_hl, self.Weights_xl, self.Bias_xl)
            c = cell_state(f,i)
            h = output_gate(eventx, h, self.Weights_ho, self.Bias_ho, self.Weights_xo, self.Bias_xo, c)
            if self.matching_in_out: # doesnt make sense but it was as it was in main code :(
                output_list.append(h)
        if self.matching_in_out:
            return output_list
        else:
            return h

同样对于全连接层,

    
    
class fully_connected_layer:
    def __init__(self,state, dict_name='fc', ):
        self.fc_Weight = state[dict_name+'.weight'][0].numpy()
        self.fc_Bias = state[dict_name+'.bias'][0].numpy() #shape is [,output_size]
        
    def forward(self,lstm_output, is_sigmoid=True):
        res=np.dot(self.fc_Weight, lstm_output)+self.fc_Bias
        print (res)
        if is_sigmoid:
            return sigmoid(res)
        else:
            return res
        

现在我们需要一个 class 将它们全部调用在一起并针对多个层进行概括 如果你需要更多的全连接层或者想为 sigmoid 等设置 false 条件,你可以修改下面的class

        
class RNN_model_Numpy:
    def __init__(self, state, input_size, hidden_dim, output_size, num_layers, matching_in_out=True):
        self.lstm_layers=[]
        for i in range(0, num_layers):
            lstm_layer_obj=numpy_lstm(layer_num=i, hidden_dim=hidden_dim, matching_in_out=True)
            lstm_layer_obj.init_weights_from_pytorch(state) 
            self.lstm_layers.append(lstm_layer_obj)
        
        self.hidden2out=fully_connected_layer(state, dict_name='hidden2out')
        
    def forward(self, feature_list):
        for x in self.lstm_layers:
            lstm_output=x.forward_lstm_pass(feature_list)
            feature_list=lstm_output
            
        return self.hidden2out.forward(feature_list, is_sigmoid=False)

对 numpy 变量进行健全性检查:

data = np.array(
           [[1,1],
            [2,2],
            [3,3]])



check=RNN_model_Numpy(state, input_size, hidden_dim, output_size, num_layers)
check.forward(data)

说明: 由于我们只需要前向传播,因此我们需要 LSTM 所需的某些功能,因为我们有遗忘门、输入门、单元门和输出门。它们只是对您提供的输入进行的一些操作。

对于get_slices函数,这是用来分解我们从pytorch得到的权重矩阵状态字典(state dictionary)是包含我们网络中所有层的权重的字典. 对于 LSTM 特别是按忽略、遗忘、学习、输出的顺序排列。因此,为此我们需要将其分解为不同的 LSTM 单元。

对于numpy_lstm class,我们有必须调用的init_weights_from_pytorch函数,它将做的是从我们之前从pytorch获得的状态字典中提取权重模型对象,然后用 pytorch 权重填充 numpy 数组权重。你可以先训练你的模型然后通过pickle保存状态字典然后使用。

全连接层class只是实现了hidden2out神经网络。

最后我们的 rnn_model_numpy class 是为了确保如果你有多层那么它能够将一层 lstm 的输出发送到另一层 lstm。

最后对数据变量进行了一次小的健全性检查。

重要说明:请注意,由于 PYTORCH 处理输入的方式完全不同,因此您可能会遇到尺寸错误,因此请确保您输入的 Numpy 与数据变量具有相似的形状。

重要参考: https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html

https://christinakouridi.blog/2019/06/19/backpropagation-lstm/