pytorch中的分布式顺序窗口数据

Distributed sequential windowed data in pytorch

在我训练的每个时期,我需要将我的数据集分成 nt 个连续样本。例如,如果我的数据是 [1,2,3,4,5,6,7,8,9,10]n = 2t = 3,那么有效的批次将是

[1-2-3, 4-5-6] and [7-8-9, 10-1-2]
[2-3-4, 8-9-10] and [5-6-7, 1-2-3]

我的旧版本如下,但它对数据中的每个点都进行了采样,这意味着我将每个时期解析整个数据集 t 次。

train_dataset = list(range(n))
train_sampler = None
if distributed:
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=bsize, shuffle=(train_sampler is None),
        pin_memory=True, sampler=train_sampler)

for epoch in range(epochs):
    if distributed: 
        train_sampler.set_epoch(epoch)

    for starting_i in train_loader:
        batch = np.array([np.mod(np.arange(i, i + t), n) for i in starting_i])

我现在已经实现了自己的抽样函数,将数据分成随机批次,其中每个样本与两个最接近的样本相距甚远 t。在非分布式场景下,我可以做到

for epoch in range(epochs):
    pad = np.random.randint(n)
    train_loader = np.mod(np.arange(pad, n + pad, t), n)
    np.random.shuffle(train_loader)
    train_loader = np.array_split(train_loader,
        np.ceil(len(train_loader) / bsize))

    for starting_i in train_loader:
        batch = np.array([np.mod(np.arange(i, i + t), n) for i in starting_i])

如何发布这个版本?我是否需要自定义 torch.nn.parallel.DistributedDataParalleltorch.utils.data.DataLoader

我检查了 DistributedSampler class 我的猜测是我必须重写 __iter__ 方法。我说得对吗?
DistributedSampler 如何拆分数据集? num_replicas中依次是? 说 num_replicas = 2。我的数据集会在 2 名工作人员之间分成 [1,2,3,4,5][6,7,8,9,10] 吗?还是随机的?喜欢 [1,4,7,3,10][2,9,5,8,6]?第一种情况对我来说没问题,因为保持样本顺序,但第二种情况不行。

我最终制作了自己的 Dataset,其中数据为 [t, t + window, ... t + n * window]。每次调用它时,它都会随机化 window 的起始索引。然后采样器像往常一样进行洗牌。对于可重复性,它有一个类似于 set_epoch 采样器的 set_seed 方法。

class SequentialWindowedDataset(Dataset):
    def __init__(self, size, window):
        self.size = size
        self.window = window
        self.seed = 0
        self.data = np.arange(0, self.size, self.window)

    def __getitem__(self, index):
        rng = np.random.default_rng(self.seed)
        pad = rng.integers(0, self.size)
        data = (self.data + pad) % self.size
        return data[index]

    def __len__(self):
        return len(self.data)

    def set_seed(self, seed):
        self.seed = seed

以下版本在调用外随机化数据,速度更快

class SequentialWindowedDataset(Dataset):
    def __init__(self, size, window):
        self.size = size
        self.window = window
        self.data = np.arange(0, self.size, self.window)

    def __getitem__(self, index):
        return self.data[index]

    def __len__(self):
        return len(self.data)

    def randomize(self, seed):
        rng = np.random.default_rng(seed)
        pad = rng.integers(0, self.size)
        self.data = (self.data + pad) % self.size