如何使用 PyTorch 正确实现数据重组?

How to properly implement data reorganization using PyTorch?

这会很长post,提前抱歉...

我正在研究去噪算法,我的目标是:

去噪算法分为以下3部分:

第一部分的思路很简单,但不太容易解释。例如,给定一个输入彩色图像和一个表示图像噪声标准偏差的输入值 "sigma"。 "down-sampling" 部分实际上是 space 深度。简而言之,对于给定的通道和 2x2 像素的子集,space 到深度创建一个由 4 个通道组成的像素。通道数乘以 4,而高度和宽度除以 2。数据只是重新组织。 噪声水平图包括创建 3 个包含标准偏差值的通道,以便 convnet 知道如何正确地对输入图像进行去噪。 使用一些代码可能会更清楚:

def downsample_and_noise_map(input, sigma):

    # Input tensor size (batch, channels, height, width)
    in_n, in_c, in_h, in_w = input.size()

    # Output tensor size
    out_h = in_h // 2
    out_w = in_w // 2
    sigma_c = in_c      # nb of channels of the standard deviation tensor
    image_c = in_c * 4  # nb of channels of the image tensor

    # Standard deviation tensor
    output_sigma = sigma.view(1, 1, 1, 1).repeat(in_n, sigma_c, out_h, out_w)

    # Image tensor
    output_image = torch.zeros((in_n, image_c, out_h, out_w))
    output_image[:, 0::4, :, :] = input[:, :, 0::2, 0::2]
    output_image[:, 1::4, :, :] = input[:, :, 0::2, 1::2]
    output_image[:, 2::4, :, :] = input[:, :, 1::2, 0::2]
    output_image[:, 3::4, :, :] = input[:, :, 1::2, 1::2]

    # Concatenate standard deviation and image tensors
    return torch.cat((output_sigma, output_image), dim=1)

此函数随后作为模型 forward 函数的第一步调用:

def forward(self, x, sigma):
    x = downsample_and_noise_map(x, sigma)
    x = self.convnet(x)
    x = upsample(x)
    return x

让我们考虑一个大小为 1x3x100x100 的输入张量(PyTorch 标准:批量、通道、高度、宽度)和 0.1 的 sigma 值。输出张量具有以下属性:

如果这段代码不够清晰,我可以post一个更简单的版本。

上采样部分是下采样部分的倒数函数。

我能够使用这个函数在 PyTorch 中进行训练和测试。

然后,我尝试使用 ONNX 作为中间步骤将模型转换为 CoreML。 转换为 ONNX 生成 "TracerWarning"。从 ONNX 到 CoreML 的转换失败(TypeError:1.0 的类型为 numpy.float64,但应为以下之一:int、long)。问题来自下采样 + 噪声级图(也来自上采样)。

当我删除下采样 + 噪声级别图和上采样层时,我能够非常轻松地转换为 ONNX 和 CoreML,因为只剩下一个简单的卷积网络。这意味着我有一个解决我的问题的方法:在移动端使用 2 个着色器实现这 2 个层。但是我对这个解决方案不满意,因为我希望我的模型包含所有层 ^^

在考虑在这里写一个 post 之前,我在网上搜索了一个答案,我能够使用 reshapepermute 编写一个更好版本的前一个函数。此版本删除了所有 ONNX 警告,但 CoreML 转换仍然失败...

def downsample_and_noise_map(input, sigma):

    # Input image size
    in_n, in_c, in_h, in_w = input.size()

    # Output tensor size
    out_n = in_n
    out_h = in_h // 2
    out_w = in_w // 2

    # Create standard deviation tensor
    output_sigma = sigma.view(out_n, 1, 1, 1).repeat(out_n, in_c, out_h, out_w)

    # Split RGB channels
    channels_rgb = torch.split(input, 1, dim=1)

    # Reshape (space-to-depth) each image channel
    channels_reshaped = []
    for channel in channels_rgb:
        channel = channel.reshape(1, out_h, 2, out_w, 2)
        channel = channel.permute(2, 4, 0, 1, 3)
        channel = channel.reshape(1, 4, out_h, out_w)
        channels_reshaped.append(channel)

    # Concatenate all reshaped image channels together
    output_image = torch.cat(channels_reshaped, dim=1)

    # Concatenate standard deviation and image tensors
    output = torch.cat([output_sigma, output_image], dim=1)

    return output

下面是我的(部分)问题:

感谢您的帮助(以及您的耐心等待)^^

免责声明我不熟悉 CoreML 或部署到 iOS,但我确实有通过 ONNX 在 TensorRT 和 OpenVINO 中部署 PyTorch 模型的经验。

我在部署到其他框架时遇到的主要问题是切片和重复张量等操作在其他框架中的支持往往有限。通常我们可以构造等效的 conv 或 transpose-conv 操作来实现所需的行为。

为了确保我们不导出用于构造转换权重的逻辑,我将权重初始化与权重应用分开。这使得 ONNX 导出更加直接,因为它看到的只是应用了一些常量张量。

class DownsampleAndNoiseMap():
    def __init__(self):
        self.initialized = False
        self.weight = None
        self.zeros = None

    def init_weights(self, input):
        with torch.no_grad():
            in_n, in_c, in_h, in_w = input.size()

            out_h = int(in_h // 2)
            out_w = int(in_w // 2)
            sigma_c = in_c
            image_c = in_c * 4

            # conv weights used for downsampling
            self.weight = torch.zeros(image_c, in_c, 2, 2).to(input)
            for c in range(in_c):
                self.weight[4 * c, c, 0, 0] = 1
                self.weight[4 * c + 1, c, 0, 1] = 1
                self.weight[4 * c + 2, c, 1, 0] = 1
                self.weight[4 * c + 3, c, 1, 1] = 1

            # zeros used to replace repeat
            self.zeros = torch.zeros(in_n, sigma_c, out_h, out_w).to(input)

        self.initialized = True

    def __call__(self, input, sigma):
        assert self.initialized
        output_sigma = self.zeros + sigma
        output_image = torch.nn.functional.conv2d(input, self.weight, stride=2)
        return torch.cat((output_sigma, output_image), dim=1)

class Upsample():
    def __init__(self):
        self.initialized = False
        self.weight = None

    def init_weights(self, input):
        with torch.no_grad():
            in_n, in_c, in_h, in_w = input.size()

            image_c = in_c * 4

            self.weight = torch.zeros(in_c + image_c, in_c, 2, 2).to(input)
            for c in range(in_c):
                self.weight[in_c + 4 * c, c, 0, 0] = 1
                self.weight[in_c + 4 * c + 1, c, 0, 1] = 1
                self.weight[in_c + 4 * c + 2, c, 1, 0] = 1
                self.weight[in_c + 4 * c + 3, c, 1, 1] = 1

        self.initialized = True

    def __call__(self, input):
        assert self.initialized
        return torch.nn.functional.conv_transpose2d(input, self.weight, stride=2)

我假设上采样是下采样的倒数,在 x == upsample(downsample_and_noise_map(x, sigma)) 的意义上(如果我在这个假设中错了,请纠正我)。我还验证了我的降采样版本与你的一致。

# consistency checking code
x = torch.randn(1, 3, 100, 100)
sigma = torch.randn(1)

# OP downsampling
y1 = downsample_and_noise_map(x, sigma)

ds = DownsampleAndNoiseMap()
ds.init_weights(x)
y2 = ds(x, sigma)

print('downsample diff:', torch.sum(torch.abs(y1 - y2)).item())

us = Upsample()
us.init_weights(x)
x_recov = us(ds(x, sigma))

print('recovery error:', torch.sum(torch.abs(x - x_recov)).item())

这导致

downsample diff: 0.0
recovery error: 0.0

正在导出到 ONNX

导出时,我们需要在使用 torch.onnx.export 之前为新的 类 调用 init_weights。例如

class Model(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.downsample = DownsampleAndNoiseMap()
        self.upsample = Upsample()
        self.convnet = lambda x: x  # placeholder

    def init_weights(self, x):
        self.downsample.init_weights(x)
        self.upsample.init_weights(x)

    def forward(self, x, sigma):
        x = self.downsample(x, sigma)
        x = self.convnet(x)
        x = self.upsample(x)
        return x

x = torch.randn(1, 3, 100, 100)
sigma = torch.randn(1)

model = Model()
# ... load state dict here
model.init_weights(x)
torch.onnx.export(model, (x, sigma), 'deploy.onnx', verbose=True, input_names=["input", "sigma"], output_names=["output"])

给出 ONNX 图

graph(%input : Float(1, 3, 100, 100)
      %sigma : Float(1)) {
  %2 : Float(1, 3, 50, 50) = onnx::Constant[value=<Tensor>](), scope: Model
  %3 : Float(1, 3, 50, 50) = onnx::Add(%2, %sigma), scope: Model
  %4 : Float(12, 3, 2, 2) = onnx::Constant[value=<Tensor>](), scope: Model
  %5 : Float(1, 12, 50, 50) = onnx::Conv[dilations=[1, 1], group=1, kernel_shape=[2, 2], pads=[0, 0, 0, 0], strides=[2, 2]](%input, %4), scope: Model
  %6 : Float(1, 15, 50, 50) = onnx::Concat[axis=1](%3, %5), scope: Model
  %7 : Float(15, 3, 2, 2) = onnx::Constant[value=<Tensor>](), scope: Model
  %output : Float(1, 3, 100, 100) = onnx::ConvTranspose[dilations=[1, 1], group=1, kernel_shape=[2, 2], pads=[0, 0, 0, 0], strides=[2, 2]](%6, %7), scope: Model
  return (%output);
}

关于在 iOS 上推荐的部署方式的最后一个问题,我无法回答,因为我没有这方面的经验。