运行 使用 PyTorchLightning 在多 GPU 的 DDP 模式下测试计算

Running test calculations in DDP mode with multiple GPUs with PyTorchLightning

我有一个模型,我尝试在 DDP 模式下与训练器一起使用。

import pytorch_lightning as pl
import torch
import torchvision
from torchmetrics import Accuracy
class Model(pl.LightningModule):

    def __init__(
        self,
        model_name: str,
        num_classes: int,
        model_hparams: Dict["str", Union[str, int]],
        optimizer_name: str,
        optimizer_hparams: Dict["str", Union[str, int]],
    ):

        super().__init__()
        self.save_hyperparameters()
        self.model = torchvision.resnet18(num_classes=num_classes, **model_hparams)

        self.loss_module = CrossEntropyLoss()
        self.example_input_array = torch.zeros((1, 3, 512, 512), dtype=torch.float32)
        # Trying to use in DDP mode
        self.test_accuracy = Accuracy(num_classes=num_classes)

    def forward(self, imgs) -> Tensor:
        return self.model(imgs)

    # <redacted training_*, val_*, etc. as they are not relevant>

    def test_step(self, batch, batch_idx):
        imgs, labels = batch
        preds = self.model(imgs)
        self.test_accuracy.update(preds, labels)
        return labels, preds.argmax(dim=-1)

    def test_epoch_end(self, outputs) -> None:
        num_classes = self.hparams.num_classes
        # Creates table of correct and incorrect predictions
        results = torch.zeros((num_classes, num_classes))
        for output in outputs:
            for label, prediction in zip(*output):
                results[int(label), int(prediction)] += 1
        # Total accuracy. This and `compute` are identical in 1 GPU training
        acc = results.diag().sum() / results.sum()
        self.log("test_acc", self.test_accuracy.compute())
        print(results) # This prints twice

和教练

trainer = pl.Trainer(
        gpus=torch.cuda.device_count(),
        max_epochs=180,
        callbacks=callbacks,
        strategy="ddp",
        auto_scale_batch_size="binsearch",
    )

但是,我从 test

获得打印件
tensor([[0., 0., 0., 0., 0., 5.],
        [0., 7., 0., 0., 0., 0.],
        [0., 3., 0., 0., 0., 2.],
        [0., 3., 0., 0., 0., 0.],
        [0., 3., 0., 0., 0., 2.],
        [0., 1., 0., 0., 0., 4.]])tensor([[0., 0., 0., 0., 0., 6.],
        [0., 2., 0., 0., 0., 0.],
        [0., 4., 0., 0., 0., 2.],
        [0., 2., 0., 0., 0., 1.],
        [0., 3., 0., 0., 0., 2.],
        [0., 5., 0., 0., 0., 3.]])

还有

trainer.fit(model, datamodule=datamodule)
test_results = trainer.test(model, datamodule=datamodule)
print(test_results) 
# [{'test_acc': 0.18333333730697632}]
# [{'test_acc': 0.18333333730697632}]

我只希望打印单个张量。我如何才能对所有测试预测进行计算,而不是通过 GPU 和 return 我在 test_epoch_end 中根据这些预测创建的 table?我将文档解释为 *_epoch_end 仅在单个 GPU 上执行并且我很迷茫。

我认为你应该使用以下技巧:

  • test_epoch_end:在ddp模式下,每个gpu在这个方法中运行相同的代码。所以每个 gpu 计算部分批次而不是整个批次的指标。您需要将指标和收集同步到 rank==0 gpu 进行计算 整个数据集的评估指标。

  • torch.distributed.reduce:该方法跨分布式GPU设备收集和计算张量。 (docs)

  • self.trainer.is_global_zero:这个标志对于 rank==0

    是正确的
  • 在测试集上手动计算指标的最佳方法是什么?你应该检查 docs

使用上述技术,您可以计算整个数据集的指标,并在 .test 之后使用 results 张量。这是片段:

import os

import torch
from torch.utils.data import DataLoader

from torchvision import models, transforms
from torchvision.datasets import CIFAR10

from pytorch_lightning import LightningModule, LightningDataModule, Trainer

os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'


class CIFAR(LightningDataModule):
    def __init__(self, img_size=32, batch_size=32):
        super().__init__()
        self.img_size = img_size if isinstance(img_size, tuple) else (img_size, img_size)
        self.batch_size = batch_size

        self.test_transforms = transforms.Compose([
            transforms.Resize(self.img_size),
            transforms.CenterCrop(self.img_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
        ])

    def prepare_data(self) -> None:
        CIFAR10(root='data', train=True, download=True)
        CIFAR10(root='data', train=False, download=True)
    
    def setup(self, stage=None):
        self.test_ds = CIFAR10(root='data', train=False, download=False, transform=self.test_transforms)

    def test_dataloader(self):
        return DataLoader(self.test_ds, num_workers=4, batch_size=self.batch_size, shuffle=False)


class BasicModule(LightningModule):
    def __init__(self):
        super().__init__()
        self.model = models.resnet18(num_classes=10, pretrained=False)

    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self.model(x)
        return y, y_hat.argmax(dim=-1)

    def test_epoch_end(self, outputs):
        results = torch.zeros((10, 10)).to(self.device)
        for output in outputs:
            for label, prediction in zip(*output):
                results[int(label), int(prediction)] += 1
        torch.distributed.reduce(results, 0, torch.distributed.ReduceOp.SUM)
        acc = results.diag().sum() / results.sum()
        if self.trainer.is_global_zero:
            self.log("test_metric", acc, rank_zero_only=True)
            self.trainer.results = results
        
    
if __name__ == '__main__':
    data = CIFAR(batch_size=512)
    model = BasicModule()
    trainer = Trainer(max_epochs=2, gpus='0,1', strategy="ddp", precision=16)
    test_results = trainer.test(model, data)
    if trainer.is_global_zero:
        print(test_results)
        print(trainer.results)