运行 使用 PyTorchLightning 在多 GPU 的 DDP 模式下测试计算
Running test calculations in DDP mode with multiple GPUs with PyTorchLightning
我有一个模型,我尝试在 DDP 模式下与训练器一起使用。
import pytorch_lightning as pl
import torch
import torchvision
from torchmetrics import Accuracy
class Model(pl.LightningModule):
def __init__(
self,
model_name: str,
num_classes: int,
model_hparams: Dict["str", Union[str, int]],
optimizer_name: str,
optimizer_hparams: Dict["str", Union[str, int]],
):
super().__init__()
self.save_hyperparameters()
self.model = torchvision.resnet18(num_classes=num_classes, **model_hparams)
self.loss_module = CrossEntropyLoss()
self.example_input_array = torch.zeros((1, 3, 512, 512), dtype=torch.float32)
# Trying to use in DDP mode
self.test_accuracy = Accuracy(num_classes=num_classes)
def forward(self, imgs) -> Tensor:
return self.model(imgs)
# <redacted training_*, val_*, etc. as they are not relevant>
def test_step(self, batch, batch_idx):
imgs, labels = batch
preds = self.model(imgs)
self.test_accuracy.update(preds, labels)
return labels, preds.argmax(dim=-1)
def test_epoch_end(self, outputs) -> None:
num_classes = self.hparams.num_classes
# Creates table of correct and incorrect predictions
results = torch.zeros((num_classes, num_classes))
for output in outputs:
for label, prediction in zip(*output):
results[int(label), int(prediction)] += 1
# Total accuracy. This and `compute` are identical in 1 GPU training
acc = results.diag().sum() / results.sum()
self.log("test_acc", self.test_accuracy.compute())
print(results) # This prints twice
和教练
trainer = pl.Trainer(
gpus=torch.cuda.device_count(),
max_epochs=180,
callbacks=callbacks,
strategy="ddp",
auto_scale_batch_size="binsearch",
)
但是,我从 test
获得打印件
tensor([[0., 0., 0., 0., 0., 5.],
[0., 7., 0., 0., 0., 0.],
[0., 3., 0., 0., 0., 2.],
[0., 3., 0., 0., 0., 0.],
[0., 3., 0., 0., 0., 2.],
[0., 1., 0., 0., 0., 4.]])tensor([[0., 0., 0., 0., 0., 6.],
[0., 2., 0., 0., 0., 0.],
[0., 4., 0., 0., 0., 2.],
[0., 2., 0., 0., 0., 1.],
[0., 3., 0., 0., 0., 2.],
[0., 5., 0., 0., 0., 3.]])
还有
trainer.fit(model, datamodule=datamodule)
test_results = trainer.test(model, datamodule=datamodule)
print(test_results)
# [{'test_acc': 0.18333333730697632}]
# [{'test_acc': 0.18333333730697632}]
我只希望打印单个张量。我如何才能对所有测试预测进行计算,而不是通过 GPU 和 return 我在 test_epoch_end
中根据这些预测创建的 table?我将文档解释为 *_epoch_end
仅在单个 GPU 上执行并且我很迷茫。
我认为你应该使用以下技巧:
test_epoch_end
:在ddp模式下,每个gpu在这个方法中运行相同的代码。所以每个 gpu 计算部分批次而不是整个批次的指标。您需要将指标和收集同步到 rank==0
gpu 进行计算
整个数据集的评估指标。
torch.distributed.reduce
:该方法跨分布式GPU设备收集和计算张量。 (docs)
self.trainer.is_global_zero
:这个标志对于 rank==0
是正确的
在测试集上手动计算指标的最佳方法是什么?你应该检查 docs
使用上述技术,您可以计算整个数据集的指标,并在 .test
之后使用 results
张量。这是片段:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.datasets import CIFAR10
from pytorch_lightning import LightningModule, LightningDataModule, Trainer
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
class CIFAR(LightningDataModule):
def __init__(self, img_size=32, batch_size=32):
super().__init__()
self.img_size = img_size if isinstance(img_size, tuple) else (img_size, img_size)
self.batch_size = batch_size
self.test_transforms = transforms.Compose([
transforms.Resize(self.img_size),
transforms.CenterCrop(self.img_size),
transforms.ToTensor(),
transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])
def prepare_data(self) -> None:
CIFAR10(root='data', train=True, download=True)
CIFAR10(root='data', train=False, download=True)
def setup(self, stage=None):
self.test_ds = CIFAR10(root='data', train=False, download=False, transform=self.test_transforms)
def test_dataloader(self):
return DataLoader(self.test_ds, num_workers=4, batch_size=self.batch_size, shuffle=False)
class BasicModule(LightningModule):
def __init__(self):
super().__init__()
self.model = models.resnet18(num_classes=10, pretrained=False)
def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self.model(x)
return y, y_hat.argmax(dim=-1)
def test_epoch_end(self, outputs):
results = torch.zeros((10, 10)).to(self.device)
for output in outputs:
for label, prediction in zip(*output):
results[int(label), int(prediction)] += 1
torch.distributed.reduce(results, 0, torch.distributed.ReduceOp.SUM)
acc = results.diag().sum() / results.sum()
if self.trainer.is_global_zero:
self.log("test_metric", acc, rank_zero_only=True)
self.trainer.results = results
if __name__ == '__main__':
data = CIFAR(batch_size=512)
model = BasicModule()
trainer = Trainer(max_epochs=2, gpus='0,1', strategy="ddp", precision=16)
test_results = trainer.test(model, data)
if trainer.is_global_zero:
print(test_results)
print(trainer.results)
我有一个模型,我尝试在 DDP 模式下与训练器一起使用。
import pytorch_lightning as pl
import torch
import torchvision
from torchmetrics import Accuracy
class Model(pl.LightningModule):
def __init__(
self,
model_name: str,
num_classes: int,
model_hparams: Dict["str", Union[str, int]],
optimizer_name: str,
optimizer_hparams: Dict["str", Union[str, int]],
):
super().__init__()
self.save_hyperparameters()
self.model = torchvision.resnet18(num_classes=num_classes, **model_hparams)
self.loss_module = CrossEntropyLoss()
self.example_input_array = torch.zeros((1, 3, 512, 512), dtype=torch.float32)
# Trying to use in DDP mode
self.test_accuracy = Accuracy(num_classes=num_classes)
def forward(self, imgs) -> Tensor:
return self.model(imgs)
# <redacted training_*, val_*, etc. as they are not relevant>
def test_step(self, batch, batch_idx):
imgs, labels = batch
preds = self.model(imgs)
self.test_accuracy.update(preds, labels)
return labels, preds.argmax(dim=-1)
def test_epoch_end(self, outputs) -> None:
num_classes = self.hparams.num_classes
# Creates table of correct and incorrect predictions
results = torch.zeros((num_classes, num_classes))
for output in outputs:
for label, prediction in zip(*output):
results[int(label), int(prediction)] += 1
# Total accuracy. This and `compute` are identical in 1 GPU training
acc = results.diag().sum() / results.sum()
self.log("test_acc", self.test_accuracy.compute())
print(results) # This prints twice
和教练
trainer = pl.Trainer(
gpus=torch.cuda.device_count(),
max_epochs=180,
callbacks=callbacks,
strategy="ddp",
auto_scale_batch_size="binsearch",
)
但是,我从 test
tensor([[0., 0., 0., 0., 0., 5.],
[0., 7., 0., 0., 0., 0.],
[0., 3., 0., 0., 0., 2.],
[0., 3., 0., 0., 0., 0.],
[0., 3., 0., 0., 0., 2.],
[0., 1., 0., 0., 0., 4.]])tensor([[0., 0., 0., 0., 0., 6.],
[0., 2., 0., 0., 0., 0.],
[0., 4., 0., 0., 0., 2.],
[0., 2., 0., 0., 0., 1.],
[0., 3., 0., 0., 0., 2.],
[0., 5., 0., 0., 0., 3.]])
还有
trainer.fit(model, datamodule=datamodule)
test_results = trainer.test(model, datamodule=datamodule)
print(test_results)
# [{'test_acc': 0.18333333730697632}]
# [{'test_acc': 0.18333333730697632}]
我只希望打印单个张量。我如何才能对所有测试预测进行计算,而不是通过 GPU 和 return 我在 test_epoch_end
中根据这些预测创建的 table?我将文档解释为 *_epoch_end
仅在单个 GPU 上执行并且我很迷茫。
我认为你应该使用以下技巧:
test_epoch_end
:在ddp模式下,每个gpu在这个方法中运行相同的代码。所以每个 gpu 计算部分批次而不是整个批次的指标。您需要将指标和收集同步到rank==0
gpu 进行计算 整个数据集的评估指标。torch.distributed.reduce
:该方法跨分布式GPU设备收集和计算张量。 (docs)
是正确的self.trainer.is_global_zero
:这个标志对于 rank==0在测试集上手动计算指标的最佳方法是什么?你应该检查 docs
使用上述技术,您可以计算整个数据集的指标,并在 .test
之后使用 results
张量。这是片段:
import os
import torch
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.datasets import CIFAR10
from pytorch_lightning import LightningModule, LightningDataModule, Trainer
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
class CIFAR(LightningDataModule):
def __init__(self, img_size=32, batch_size=32):
super().__init__()
self.img_size = img_size if isinstance(img_size, tuple) else (img_size, img_size)
self.batch_size = batch_size
self.test_transforms = transforms.Compose([
transforms.Resize(self.img_size),
transforms.CenterCrop(self.img_size),
transforms.ToTensor(),
transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])
def prepare_data(self) -> None:
CIFAR10(root='data', train=True, download=True)
CIFAR10(root='data', train=False, download=True)
def setup(self, stage=None):
self.test_ds = CIFAR10(root='data', train=False, download=False, transform=self.test_transforms)
def test_dataloader(self):
return DataLoader(self.test_ds, num_workers=4, batch_size=self.batch_size, shuffle=False)
class BasicModule(LightningModule):
def __init__(self):
super().__init__()
self.model = models.resnet18(num_classes=10, pretrained=False)
def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self.model(x)
return y, y_hat.argmax(dim=-1)
def test_epoch_end(self, outputs):
results = torch.zeros((10, 10)).to(self.device)
for output in outputs:
for label, prediction in zip(*output):
results[int(label), int(prediction)] += 1
torch.distributed.reduce(results, 0, torch.distributed.ReduceOp.SUM)
acc = results.diag().sum() / results.sum()
if self.trainer.is_global_zero:
self.log("test_metric", acc, rank_zero_only=True)
self.trainer.results = results
if __name__ == '__main__':
data = CIFAR(batch_size=512)
model = BasicModule()
trainer = Trainer(max_epochs=2, gpus='0,1', strategy="ddp", precision=16)
test_results = trainer.test(model, data)
if trainer.is_global_zero:
print(test_results)
print(trainer.results)