模型训练的原理:

前向传播:在前向传播过程中,模型接受输入数据,并通过当前的参数计算出预测值。对于神经网络模型,前向传播可以简单描述为输入数据通过一系列的神经元层,最终得到模型的预测输出。反向传播:在反向传播过程中,模型根据预测输出与实际值之间的差异,计算出损失函数,并通过链式法则将损失函数沿着网络反向传播,更新模型中的参数。这个过程可以使得模型的预测结果与实际值更加接近,提高模型的准确性。

数据并行 vs. 模型并行

模型并行:数据拷贝(per device),模型 split/chunk(显然是单卡放不下模型的情况下),不能加速训练,但可以解决OOM。数据并行:模型拷贝(per device),数据 split/chunk(对batch切分),可以加速训练。

每个device上都拷贝一份完整模型,每个device分别处理1个batch的一部分(如batch_size=64, 2个device, 每device处理32个样本)梯度反向传播时,每个设备上的梯度求和(求和才是一个完整batch所有样本的loss),汇入中心设备/参数服务器(默认gpu0)对模型进行梯度优化。

DP vs. DDP

共同点:都是数据并行,都是即插即用(model=nn.DataParallel(model),model=DDP(model,device_ids=[args.local_rank], output_device=args.local_rank).to(device)) 不同点:

DP:nn.DataParallel (不推荐) 仅支持单机多卡, 显存负载不均衡(device[0]负载大),不支持Apex混合精度训练,GPU利用不均衡,因Python GIL争用影响 仅支持单进程多线程,训练速率低。 DDP: DistributedDataParallel (推荐) 支持单机多卡&多机多卡,数据分配均衡,支持Apex混合精度训练,借助All-Reduce的数据交换方式提高GPU的通讯效率,支持多进程,训练速率高。

0. 不并行(单机单卡)1. 数据并行DP(nn.parallel.DataParallel)step1: 并行化包裹模型step2:加载到device0step3:forward前向传播step4:反向传播梯度聚合

2. 分布式数据并行DDP(nn.parallel.DistributedDataParallel)2.1 DDP基本概念2.2 用torch.distributed.launch启动的代码2.3 手写torch.multiprocessing启动的代码step1:导入相关的包step2:ddp_setup函数step3:Trainer类step4:MyTrainDataset类step5:main函数step6:解析命令行参数并运行主函数

2.4 用torchrun启动的代码(推荐)2.5 多机多卡实战:DDP训练Transformer

3. 模型并行3.1 Huggingface实现3.2 to(device)手动实现3.3 accelerate实现

4. Deepspeedstep1:deepspeed初始化step2:deepspeed封装模型和数据集step3:训练单机节点node多卡gpu运行实战:DeepSpeed训练StableDiffusion和Transformer

5. Accelerate6. Trainer

0. 不并行(单机单卡)

判断GPU是否可用:device = "cuda" if torch.cuda.is_available() else "cpu"模型拷贝:model.to(device)数据拷贝:data.to(device)模型加载与保存:torch.load(xx.pt, map_location=device), torch.save(model, "model.pt")

1. 数据并行DP(nn.parallel.DataParallel)

简单一行代码,包裹model即可:model = DataParallel(model.to(device), device_ids=[0, 1, 2, 3])数据复制到master GPU上:data.to("cuda")batch_size应该是所有GPU的batch_size总和模型加载与保存(注意保存model.module.state_dict):torch.load(xx.pt, map_location=[0,1,2,3]), torch.save(model.module.state_dict, "model_state_dict.pt"),model.load_state_dict(state_dict)

预先定义一下Dataset和Model

import torch

import torch.nn as nn

from torch.utils.data import Dataset, DataLoader

class RandomDataset(Dataset):

def __init__(self, size, length):

self.len = length

# 100*5

self.data = torch.randn(length, size)

def __getitem__(self, index):

# (5, )

return self.data[index]

def __len__(self):

# 100

return self.len

class Model(nn.Module):

# Our model

def __init__(self, input_size, output_size):

# 5 => 2

super(Model, self).__init__()

self.fc = nn.Linear(input_size, output_size)

def forward(self, input):

output = self.fc(input)

print("\tIn Model: input size", input.size(),

"output size", output.size())

return output

input_size = 5 # 模型输入数据维度(b,n) = (30, 5)

output_size = 2 # 模型输出数据维度(b,n) = (30, 2)

batch_size = 30 # batch size

data_size = 100 # 数据集样本数量

rand_loader = DataLoader(dataset=RandomDataset(input_size, data_size),

batch_size=batch_size,

shuffle=True)

# 构造优化器和损失函数

optimizer = optim.SGD(model.parameters(), lr=0.01)

criterion = nn.MSELoss()

# 模拟目标值

target = torch.randn(64, 5)

step1: 并行化包裹模型

# Parameters and DataLoaders

# (5, 2)

model = Model(input_size, output_size)

if torch.cuda.device_count() > 1: # 如果不止1张GPU

# 构建数据并行模型

device_ids = [0, 1] # 使用的设备ID列表

# 如3张GPU,dim = 0,[30, xxx] -> [15, ...], [15, ...] on 2 GPUs

model = nn.DataParallel(model, device_ids) # 并行化,默认使用所有device加载数据

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

model= 指传入的模型device_ids=None,

参与训练的 GPU 有哪些,device_ids=gpus,默认None是使用全部device; output_device=None

指定中心设备(参数服务器),用于汇总梯度的 GPU 是哪个,output_device=gpus[0] dim=0

从那一维度进行数据切分,默认batch维度 在执行 forward/backward 之前,使用 DataParallel 将 model 复制到 device_ids 指定设备上,进行数据并行处理。

model.to('cuda:0')不同的是tensor的to(device)是在device上生成一个拷贝,不改变原来cpu上的tensor;而model是直接将原model转移到gpu上。

step2:加载到device0

设置中心设备(参数服务器),用于反向传播时的梯度汇总,一般指定cuda:0

# 将模型从cpu放在gpu 0上

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

model.to(device)

step3:forward前向传播

模型forward时,将data_loader加载的一个batch的数据进行切分,送入不同device的模型进行计算,再将结果合并输出。

for data in rand_loader:

# input_var can be on any device, including CPU

input = data.to(device)

# input = data

output = model(input)

print("Outside: input size", input.size(),

"output_size", output.size())

"""

In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])

In Model: input size torch.Size([15, 5]) output size torch.Size([15, 2])

Outside: input size torch.Size([30, 5]) output_size torch.Size([30, 2])

"""

step4:反向传播梯度聚合

loss.backward()分别在每个device上计算loss的梯度,average_gradients(model)将梯度聚合到中心设备/参数服务器(cuda:0)上,进行梯度优化

# 在每个设备上进行前向传播和梯度计算

loss = criterion(output, target)

loss.backward()

# 对各个设备上的梯度进行求和

average_gradients(model)

# 使用原始设备模型进行梯度优化

optimizer.step()

2. 分布式数据并行DDP(nn.parallel.DistributedDataParallel)

multiple GPUs in a single machine/server/node:单机多卡

DDP采用Bucketing分桶机制:实现communication和computation并行执行。

通过在NCCL和GLOO上实验发现,桶大小bucket_size设置为25MB比较合适。(GLOO是一种高性能的分布式训练框架,支持CPU和非NVIDIAGPU上的分布式训练;NCCL是NVIDIA提供的GPU专用通信库,被广泛应用于GPU上的分布式训练)

分布式数据并行时,模型(model parameters)/优化器(optimizer states)每张卡都会拷贝一份(replicas)

DDP 始终在卡间维持着模型参数和优化器状态的同步一致性在整个训练过程中; Data Parallel,一个batch的数据通过 DistributedSampler 切分split 分发到不同的 gpus 上

此时虽然模型/optimizer 相同,但因为每个device的数据输入不同,导致 loss 不同,反向传播时计算到的梯度也会不同此时 ddp 通过 ring all-reduce algorithm ,保证每个batch step结束后不同卡间model/optimizer 的同步一致性

如上图所示,Ring all-reduce algorithm

首先会将所有的 gpu cards 连成一个 ring环其同步过程,不需要等待所有的卡都计算完一轮梯度,经过这个同步过程之后,所有的卡的 models/optimizers 就都会保持一致的状态;

Ring all-reduce algorithm 计算和同步的几个过程

红线:GPUs 分别计算损失(forward)和梯度(backward)蓝线:梯度的聚合到中心device/参数服务器上(gpu0)绿线:(模型/优化器)参数的更新及广播(broadcast);

其实参数服务器master 可以是一个GPU0,也可以是CPU,也可以是所有GPU: 但将数据发送到GPU0会成为device通信的瓶颈:

所以采用环形的梯度聚合方式更加高效:

2.1 DDP基本概念

world:

world 表示包含所有进程的组(所有gpu的集合)。每个进程通常对应一个 GPU, world 中的进程可以相互通信,这使得使用分布式数据并行(Distributed Data Parallel, DDP)进行训练成为可能。 world_size(gpu个数/进程个数):

world_size 表示分布式训练环境中的总进程数/gpu数。每个进程都会被分配一个唯一的标识符(rank),从 0 到 world_size-1。 rank(进程标识符):

rank 是分配给world中每个进程的唯一标识符,用于标识每个进程在分布式训练中的角色。local rank是分配个单个node中每个进程的标识符,world中可能有多个node。每个进程的local_rank都不一样。 判断master GPU:if local_rank == 0: node(节点):

node 可以理解为一个服务器,代表着物理设备上的一个实体。在多机分布式训练中,每台机器被视为一个节点,节点之间需要进行通信。例如,如果有2 个node/server,每个 node/server/machine 各有4张卡(4 gpus)。total_world_size = 2(节点数) * 4(每个节点的 GPU 数量)= 8, rank 的取值范围为 [0, 1, 2, 3, 4, 5, 6, 7], local_rank 的取值范围为 [0, 1, 2, 3],[0, 1, 2, 3] 分别对应着不同的节点上的进程。 All to one:聚合过程(reduce),所有GPU(model和optiminizer状态)汇聚到参数服务器; one to All:广播过程(broadcast),参数服务器广播到所有GPU;

2.2 用torch.distributed.launch启动的代码

用torch.distributed.launch运行:会根据n_gpus自动分配进程个数(GPU个数),但需要train.py中有接受local_rank的参数选项,launch会自动为每个GPU传入这个参数。

python -m torch.distributed.launch --nproc_per_node=n_gpus train.py

初始化进程组:torch.distributed.init_process_group(backend="nccl", world_size=n_gpus, rank=local_rank) 设置当前进程使用的GPU:torch.cuda.set_device(local_rank) # 相当于CUDA_VISIBLE_DEVICES 对模型包裹:model = DistributedDataParallel(model.to(local_rank), device_ids=[local_rank]) 对数据集包裹,将data的不同样本分配到不同GPU,实现数据并行:train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset) 构造数据加载器:train_data_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size,..., sampler=train_sampler) 加载数据到GPU:data = data.to(local_rank) 模型加载与保存(注意保存model.module.state_dict):torch.load(xx.pt, map_location=[0,1,2,3]), torch.save(model.module.state_dict, "model_state_dict.pt"),model.load_state_dict(state_dict) 在每个epoch开始处,调用train_sampler.set_epoch(epoch)使得数据充分打乱。 因为已经有了train_sampler,就不要在Data Loaer中设置shufle=True 和DP不同的是,DDP的batch_size应该是单个GPU需要的batch_size大小。

2.3 手写torch.multiprocessing启动的代码

按照下面的脚本:

python DDP_script_no_torchrun.py

step1:导入相关的包

import os

import torch

import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp

from torch.utils.data.distributed import DistributedSampler # 分发数据

from torch.nn.parallel import DistributedDataParallel as DDP # 包装model使之数据并行

from torch.distributed import init_process_group, destroy_process_group

step2:ddp_setup函数

在不用trochrun时,这个函数用于设置分布式训练的环境(对rank号GPU进行初始化)。它调用了init_process_group函数来初始化进程组,在分布式情况下,使用backend(后端)是nccl完成GPU间通信(NVIDIA Collective Communication Library是NVIDIA 专用的GPU通信库),然后使用torch.cuda.set_device函数,根据环境变量设置当前进程使用的GPU设备。

def ddp_setup(rank, world_size):

"""

Args:

rank: Unique identifier of each process

world_size: Total number of processes

"""

# rank 0 process

os.environ["MASTER_ADDR"] = "localhost"

os.environ["MASTER_PORT"] = "12355"

# nccl:NVIDIA Collective Communication Library

# 分布式情况下的,gpus 间通信

init_process_group(backend="nccl", rank=rank, world_size=world_size)

torch.cuda.set_device(rank)

step3:Trainer类

这个类定义了一个模型训练的封装器。在初始化方法中,它接收一个模型backend、一个训练数据加载器train_dataloader、一个优化器train_dataloader作为参数,并将模型移动到GPU上,然后使用DistributedDataParallel对模型进行包装,以实现数据并行。(model先放cuda再DDP封装)

分布式体现在哪?原本单GPU执行 每个epoch下,Len(data_loader)=2048,Batchsize=32,一个GPU计算的数据量 Steps = Len(data_loader) / Batchsize,即2048/32=64个;而使用8个GPU时,每个GPU计算的数据量Steps = Len(data_loader) / Batchsize / world_size,即2048/32/8=64/8=8个,单个GPU吞吐的数据变少,BatchSize就可以调大了。

_run_batch方法实现了一次批量的训练过程,包括前向传播、计算损失、反向传播和更新参数。_run_epoch方法用于遍历整个训练集进行训练,self.train_dataloader.sampler.set_epoch(epoch)是用于设置数据加载器的epoch,以保证每个GPU在每个epoch开始时加载的数据都是不同的。train方法则用于控制训练的总体流程。

class Trainer:

def __init__(self,

model: torch.nn.Module,

train_dataloader: DataLoader,

optimizer: torch.optim.Optimizer,

gpu_id: int) -> None:

self.gpu_id = gpu_id # rank

self.train_dataloader = train_dataloader

self.optimizer = optimizer

# 对模型进行wrap

self.model = model.to(self.gpu_id)

self.model = DDP(model, device_ids=[self.gpu_id]) # 每张卡都会维护一个model

def _run_batch(self, xs, ys):

self.optimizer.zero_grad()

output = self.model(xs)

loss = F.cross_entropy(output, ys)

loss.backward()

self.optimizer.step()

def _run_epoch(self, epoch):

batch_size = len(next(iter(self.train_dataloader))[0])

# 打印在哪个GPU上跑的哪个epoch (Steps = Len(data_loader) / Batchsize / world_size)

print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')

# 每个epoch对数据划分的方式不同

self.train_dataloader.sampler.set_epoch(epoch)

for xs, ys in self.train_dataloader:

xs = xs.to(self.gpu_id)

ys = ys.to(self.gpu_id)

self._run_batch(xs, ys)

def train(self, max_epoch: int):

for epoch in range(max_epoch):

self._run_epoch(epoch)

step4:MyTrainDataset类

这个类定义了一个自定义的训练数据集。在初始化方法中,它接收一个大小参数,并生成一组随机的数据样本。__len__方法返回数据集的大小,__getitem__方法用于获取指定索引处的数据样本。

class MyTrainDataset(Dataset):

def __init__(self, size):

self.size = size

self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

def __len__(self):

return self.size

def __getitem__(self, index):

return self.data[index]

step5:main函数

这个函数是程序的主函数。在函数内部,首先调用了ddp_setup函数来设置分布式训练的环境。

然后创建了一个自定义的训练数据集和相应的数据加载器,以及一个线性模型和一个优化器。DistributedSampler是PyTorch提供的一个分布式采样器,用于确保每个进程加载的数据都是不同的且顺序随机。sampler对象被传入训练数据集的构造函数,可以通过数据加载器(如torch.utils.data.DataLoader)的sampler参数指定。在每个进程中,DistributedSampler会根据进程ID和进程数量,将整个训练数据集划分成多个部分,并为每个进程提供其应加载的数据索引。这样,在分布式训练过程中,每个进程只会加载自己负责的数据部分,避免了数据重复加载。

接下来,创建了一个Trainer对象,并调用其train方法进行模型训练。最后调用destroy_process_group函数销毁进程组。

def main(rank: int, world_size: int, max_epochs: int, batch_size: int):

# register ddp

ddp_setup(rank, world_size)

train_dataset = MyTrainDataset(2048)

train_dataloader = DataLoader(train_dataset,

batch_size=batch_size,

pin_memory=True,

shuffle=False,

# batch input: split to each gpus (且没有任何 overlaping samples 各个 gpu 之间)

sampler=DistributedSampler(train_dataset))

model = torch.nn.Linear(20, 1)

optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)

trainer = Trainer(model=model, optimizer=optimzer, train_dataloader=train_dataloader)

trainer.train(max_epochs)

destroy_process_group()

step6:解析命令行参数并运行主函数

在这个步骤中,首先使用argparse模块解析命令行参数,包括最大训练周期数max_epochs和批量大小batch_size。然后调用main函数,并将解析后的参数传递给它进行模型训练。

if __name__ == '__main__':

import argparse

parser = argparse.ArgumentParser(description='simple distributed training job')

parser.add_argument('--max_epochs', type=int, help='Total epochs to train the model')

parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')

args = parser.parse_args()

world_size = torch.cuda.device_count()

# 启动 world_size 个进程(每个进程对应一个GPU)执行train函数

mp.spawn(main, args=(world_size, args.max_epochs, args.batch_size), nprocs=world_size)

# mp.spawn传入函数名train

# train()的第一个参数 rank 由 mp 的 id 自动指定

# train()的其他个参数 由args传入

# nprocs=world_size指定进程数量

2.4 用torchrun启动的代码(推荐)

torchrun运行分布式train.py脚本,区别:torchrun等于python -m torch.distributed.launch,会自动设置local_rank和world_size。

torchrun --nproc-per-node=2 ddp_gpus_torchrun.py --max_epochs 5 --batch_size 32

torchrun的参数:nproc-per-node设置每个node服务器上的gpu个数(一般是1个服务器下的gpu个数)python脚本的参数:ddp_gpus_torchrun.py脚本名称,--max_epochs 5 --batch_size 32脚本参数。

实现batch_size不变的情况下,对step的切分: (如单卡情况下,data_len=1024,batch_size=32,则一个gpu的step=1024/32=32) (多卡情况下2个gpu,data_len=1024,batch_size=32,则每个gpu的step=(1024/32)/2=32/2=16)

使用torchrun的情况下,不需要指定local_rank和world_size,把local_rank替换为int(os.environ['LOCAL_RANK'])即可:

import os, sys

import torch

import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader

import torch.multiprocessing as mp

from torch.utils.data.distributed import DistributedSampler

from torch.nn.parallel import DistributedDataParallel as DDP

from torch.distributed import init_process_group, destroy_process_group

"""

首先必须声明 setup 和 cleanup 函数:

* setup: 这将创建一个进程组,并且所有计算进程都可以通过这个进程组通信

* cleanup: 销毁进程组

"""

def ddp_setup():

"Setup the distributed environment"

# if rank 0 process MASTER_ADDR and MASTER_PORT is needed

# os.environ["MASTER_ADDR"] = "localhost"

# os.environ["MASTER_PORT"] = "12355"

# nccl:NVIDIA Collective Communication Library

init_process_group(backend="nccl")

torch.cuda.set_device(int(os.environ['LOCAL_RANK']))

def ddp_cleanup():

"Cleans up the distributed environment"

destroy_process_group()

class Trainer:

def __init__(self,

model: torch.nn.Module,

train_dataloader: DataLoader,

optimizer: torch.optim.Optimizer,

) -> None:

self.gpu_id = int(os.environ['LOCAL_RANK'])

self.model = model.to(self.gpu_id)

self.model = DDP(model, device_ids=[self.gpu_id])

self.train_dataloader = train_dataloader

self.optimizer = optimizer

def train(self, max_epoch: int):

self.model.train()

for epoch in range(max_epoch):

self._run_epoch(epoch)

batch_size = len(next(iter(self.train_dataloader))[0])

print(f'[GPU: {self.gpu_id}] Epoch: {epoch} | Batchsize: {batch_size} | Steps: {len(self.train_dataloader)}')

self.train_dataloader.sampler.set_epoch(epoch)

for xs, ys in self.train_dataloader:

xs, ys = xs.to(self.gpu_id), ys.to(self.gpu_id)

self.optimizer.zero_grad()

output = self.model(xs)

loss = F.cross_entropy(output, ys)

loss.backward()

self.optimizer.step()

class MyTrainDataset(Dataset):

def __init__(self, size):

self.size = size

self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

def __len__(self):

return self.size

def __getitem__(self, index):

return self.data[index]

def main(max_epochs: int, batch_size: int):

ddp_setup()

train_dataset = MyTrainDataset(2048)

train_dataloader = DataLoader(train_dataset,

batch_size=batch_size,

pin_memory=True,

shuffle=False,

# batch input: split to each gpus (且没有任何 overlaping samples 各个 gpu 之间)

sampler=DistributedSampler(train_dataset))

model = torch.nn.Linear(20, 1)

optimzer = torch.optim.SGD(model.parameters(), lr=1e-3)

# set model to ddp model

trainer = Trainer(model=model, optimizer=optimzer, train_dataloader=train_dataloader)

trainer.train(max_epochs)

ddp_cleanup()

if __name__ == '__main__':

import argparse

parser = argparse.ArgumentParser(description='simple distributed training job')

parser.add_argument('--max_epochs', type=int, help='Total epochs to train the model')

parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')

args = parser.parse_args()

# world_size = torch.cuda.device_count()

main(args.max_epochs, args.batch_size)

看到这里,你可能疑问,我怎样把我的数据和模型发送到另一个 GPU 上完成 GPU间通信呢?

这正是 DistributedDataParallel 模块发挥作用的地方, 它将您的模型复制到每个 GPU 上 ,并且当 loss.backward() 被调用进行反向传播的时候,所有这些模型副本的梯度自动将被同步地平均/下降 (reduce)。这确保每个设备在执行优化器步骤后具有相同的权重。

2.5 多机多卡

使用torch.distributed.launch启动 --nproc_per_node=每个机器上有几个GPU --nnodes=节点数/机器数/服务器数 --node_rank=当前运行的命令在第几个节点上 --master_addr=主节点ip --master_port=主节点端口

实战:DDP训练Transformer

https://pytorch.org/tutorials/advanced/ddp_pipeline.html#model-scale-and-pipe-initialization

3. 模型并行

数据并行是切数据(scattering inputs and gathering outputs),模型并行是切模型(shards);

模型并行:单卡放不下一份模型;将一份大模型,不同的层切分到不同的卡上,forward时串行执行;

3.1 Huggingface实现

device_map:Huggingface支持自动实现模型并行

device_map参数的取值["auto", "balanced", "balanced_low_0", "sequential"]auto的模型分割优先级:GPU(s) > CPU (RAM) > Disk

如下,如果有两个gpu,device_map="auto"使模型的2个layers的parameter分别加载到两张gpu上(各一半):

from transformers import LlamaTokenizer, LlamaForCausalLM, GenerationConfig

model = LlamaForCausalLM.from_pretrained("decapoda-research/llama-7b-hf",

load_in_8bit=True,

device_map="auto",

)

for i, para in enumerate(model.named_parameters()):

# print(f'{i}, {para[0]}\t {para[1].device} \t{para[1].dtype}')

print(f'{i}, \t {para[1].device} \t{para[1].dtype}')`

3.2 to(device)手动实现

模型并行,卡间串行,时间换空间。

pytorch模拟Huggingface的模型并行原理:分别用to(device),将不同的layers加载到不同的gpu上,forward时将data也加载到对应gpu!!(weight*data之前需要保证两个tensor在相同的device)。

import torch

import torch.nn as nn

import torch.optim as optim

class ToyModel(nn.Module):

def __init__(self):

super(ToyModel, self).__init__()

self.net1 = torch.nn.Linear(10000, 10).to('cuda:0')

self.relu = torch.nn.ReLU()

self.net2 = torch.nn.Linear(10, 5).to('cuda:1')

def forward(self, x):

# 卡间串行执行

x = self.net1(x.to('cuda:0')))

x = self.net2(self.relu(x.to('cuda:1'))

return x

实例化模型,将其参数加载到2个GPU之后:

model = ToyModel()

进行一个batch的train:每个batch_size=20样本,5分类。label和pred计算loss之前也要统一device!

loss_fn = nn.MSELoss()

optimizer = optim.SGD(model.parameters(), lr=0.001)

optimizer.zero_grad()

outputs = model(torch.randn(20, 10000))

labels = torch.randn(20, 5).to('cuda:1')

loss_fn(outputs, labels).backward()

optimizer.step()

3.3 accelerate实现

accelerate需要将准备四种主要类型的对象:models (torch.nn.Module)、optimizers (torch.optim.Optimizer)、dataloaders (torch.data.dataloader.DataLoader)、scheduler(可选),一起传递给 prepare()方法。

Dataloader必须是torch.data.dataloader.DataLoader,否则会每张卡都把整个数据集全部加载一遍,就失去ddp的意义了。 ?还是说Accelerate设置batchsize=1时,每张卡都把整个数据集全部加载一遍,就失去ddp的意义了。 ?

大佬解答一下吧

看这3篇文章:

https://zhuanlan.zhihu.com/p/605640431https://zhuanlan.zhihu.com/p/606061177https://zhuanlan.zhihu.com/p/462453622https://zhuanlan.zhihu.com/p/668646528

4. Deepspeed

技术栈

术语:其实和前面DDP的概念一样。

Train的数据4部分组成:model模型参数、backward的梯度gradient、optimizer优化器参数、forward的数据tensor

Deepspeed、ZeRO技术方案:分发Partitioning(按gpu数量N等分数据)、卸载Offload(不用的数据放入CPU)、模型并行Pipeline(模型参数按层切分到不同gpu上)

step1:deepspeed初始化

使用 deepspeed.init_distributed() 方法替换了通常的 torch.distributed.init_process_group() 方法来初始化分布式设置。默认情况下,DeepSpeed使用NCCL后端进行分布式训练

# init distributed

deepspeed.init_distributed()

加载参数local_rank

def parse_arguments():

import argparse

parser = argparse.ArgumentParser(description='deepspeed training script.')

parser.add_argument('--local_rank', type=int, default=-1,

help='local rank passed from distributed launcher')

# Include DeepSpeed configuration arguments

parser = deepspeed.add_config_arguments(parser)

args = parser.parse_args()

return args

step2:deepspeed封装模型和数据集

deepspeed.initialize()封装model和dataset,相当于将模型和数据集交给deepspeed进行托管,engine就是deepspeed封装后的model,其他返回值同样都是deepspeed封装过的。(其中optimizer和lr_scheduler 后面是用不到的),我们只需要模型engine和数据加载器training_dataloader。

还要传入一个deepspeed的配置文件deepspeed_config。

# init model

model = MyClassifier(3, 100, ch_multi=128)

# init dataset

ds = MyDataset((3, 512, 512), 100, sample_count=int(1e6))

# init engine

engine, optimizer, training_dataloader, lr_scheduler = deepspeed.initialize(

args=args,

model=model,

model_parameters=model.parameters(),

training_data=ds,

config=deepspeed_config,

)

# load checkpoint

engine.load_checkpoint("./data/checkpoints/MyClassifier/")

step3:训练

在使用DeepSpeed进行分布式训练时,通常不需要手动调用optimizer.zero_grad()来清零梯度。DeepSpeed会自动处理梯度累积和梯度清零的操作,无需手动调用zero_grad()。

当使用DeepSpeed进行分布式训练时,一般会在engine.backward(loss)之后调用engine.step()来执行梯度更新操作。在engine.step()中,DeepSpeed会执行优化器的step()方法来更新模型参数,并在必要的时候自动清零梯度,以便进行下一轮的反向传播。

engine.train() # model.train()

for step, (data, label) in enumerate(training_dataloader):

step += 1

data= data.to(device=engine.device, dtype=torch.float16) # x

label = label.to(device=engine.device, dtype=torch.long).reshape(-1) # y

# deepspeed不需要optimizer.zero_grad()梯度清零

outputs = engine(data) # model forward

loss = F.cross_entropy(outputs, label)

engine.backward(loss) # 代替loss.backward()

engine.step() # 代替optimizer.step()

# 只在 local_rank=0 保存模型,且使用 model.state_dict() 保存;

if step % save_step_interval == 0 and local_rank == 0:

os.makedirs(save_path, exist_ok=True)

save_file = os.path.join(save_path, f"cifar_net_step_{step}.pth")

torch.save({'epoch': epoch_index,

####### 3. use model.module.state_dict() instead of model.state_dict()

'model_state_dict': model.module.state_dict(),

'optimizer_state_dict': optimizer.state_dict(),

'loss': loss,

}, save_file)

单机节点node多卡gpu运行

针对多节点/多服务器:--hostfile是准备主机文件, 创建一个主机文件(hostfile),其中列出了可访问且支持无密码SSH访问的机器名称或SSH别名,以及每台机器上可用于训练的GPU数量。 --num_nodes 和 --num_gpus 参数限制训练作业使用的节点数和GPU数。例如,–num_nodes=2 限制使用两个节点进行训练。--include 和 --exclude 参数选择性地包含或排除特定资源。例如,–exclude=“worker-2:0@worker-3:0,1” 表示排除worker-2上的GPU 0和worker-3上的GPU 0和1。

deepspeed \

--hostfile hostfile \

deepspeed_script.py \

-- \

--deepspeed \

--deepspeed_config "$PWD/deepspeed_config.json"

针对单节点/单服务器:无需使用主机文件--hostfile和--num_nodes 和 --num_gpus。它将根据本地机器上的GPU数量自动发现可用的插槽数量。

deepspeed \

deepspeed_script.py \

-- \

--deepspeed \

--deepspeed_config "$PWD/deepspeed_config.json"

DeepSpeed配置:deepspeed_config.json

当使用配置JSON文件来启用、禁用或配置DeepSpeed功能时,需要注意一些核心参数。以下是关于定义训练批次大小、梯度累积步数、优化器类型和参数、FP16以及零优化等内容的指导:

训练批次大小 (train_batch_size):在配置文件中,可以通过指定一个整数值来设置训练批次的大小。这个值代表每个训练步骤中用于训练的样本数。梯度累积步数 (gradient_accumulation_steps):通过设置这个参数,可以定义梯度累积的步数。这意味着在执行优化器步骤之前,模型将进行多少次前向传播和反向传播。这对于处理大批量训练数据而内存有限的情况很有用。优化器类型和参数 (optimizer):在配置文件中,可以指定优化器的类型(如"Adam")并定义相应的参数,比如学习率 (lr)。这样可以配置模型训练中所使用的优化器及其超参数。FP16(半精度浮点数) (fp16):可以通过设置这个参数来启用或禁用FP16混合精度训练。将其设置为true表示启用FP16,以减少模型训练时的内存占用。零优化 (zero_optimization):这个参数用于启用或禁用零优化技术,即在模型训练中将零梯度忽略以减少计算。将其设置为true表示启用零优化。

{

"train_micro_batch_size_per_gpu": 1,

"gradient_accumulation_steps": 4,

"optimizer": {

"type": "Adam",

"params": {

"lr": 0.001,

"betas": [

0.8,

0.999

],

"eps": 1e-08,

"weight_decay": 3e-07

}

},

"scheduler": {

"type": "WarmupLR",

"params": {

"warmup_min_lr": 0,

"warmup_max_lr": 0.001,

"warmup_num_steps": 1000

}

},

"activation_checkpointing": {

"partition_activations": true,

"cpu_checkpointing": true,

"contiguous_memory_optimization": false,

"number_checkpoints": null,

"synchronize_checkpoint_boundary": false,

"profile": true

},

"fp16": {

"enabled": true,

"auto_cast": false,

"loss_scale": 0,

"initial_scale_power": 16,

"loss_scale_window": 1000,

"hysteresis": 2,

"consecutive_hysteresis": false,

"min_loss_scale": 1

},

"zero_optimization": {

"stage": 3,

"offload_param": {

"device": "cpu",

"pin_memory": true

},

"offload_optimizer": {

"device": "cpu",

"pin_memory": true

},

"contiguous_gradients": true,

"overlap_comm": true

}

}

实战:DeepSpeed训练StableDiffusion和Transformer

https://www.bilibili.com/video/BV1By4y1F7pg/?spm_id_from=333.337.search-card.all.click&vd_source=b2549fdee562c700f2b1f3f49065201b

https://blog.csdn.net/qq_35812205/article/details/131607096

5. Accelerate

Accelerate 是一个库,旨在无需大幅修改代码的情况下完成并行化。除此之外, Accelerate 附带的数据 pipeline 还可以提高代码的性能。

首先,让我们将刚刚执行的所有上述代码封装到一个函数中,以帮助我们直观地看到差异:

DDP代码实现:该代码有点低效,因为每个设备都会创建一个 dataloader。这些代码只能运行在多 GPU 下,当想让这个代码运行在单个 GPU 或 TPU 时,还需要额外进行一些修改。

def train_ddp(rank, world_size):

device = "cuda"

ddp_setup(rank, world_size)

# Build DataLoaders

transform = transforms.Compose([

transforms.ToTensor(),

transforms.Normalize((0.1307), (0.3081))

])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)

test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)

test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

# Build model

model = model.to(rank)

ddp_model = DDP(model, device_ids=[rank])

# Build optimizer

optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

# Train for a single epoch

model.train()

for batch_idx, (data, target) in enumerate(train_loader):

data, target = data.to(device), target.to(device)

output = model(data)

loss = F.nll_loss(output, target)

loss.backward()

optimizer.step()

optimizer.zero_grad()

ddp_cleanup()

# Evaluate

model.eval()

correct = 0

with torch.no_grad():

for data, target in test_loader:

data, target = data.to(device), target.to(device)

output = model(data)

pred = output.argmax(dim=1, keepdim=True)

correct += pred.eq(target.view_as(pred)).sum().item()

print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

Accelerate代码实现:通过 Accelerator 类解决上述问题。通过它,不论是单节点还是多节点,除了三行代码外,其余代码几乎保持不变,如下所示:

# 代替ddp_setup()和ddp_cleaup()

accelerator = Accelerator()

# 代替DistributedDataParallel (model, device_ids=[self.gpu_id])和DistributedSampler(train_dataset)

# Send everything through `accelerator.prepare`

train_loader, test_loader, model, optimizer = accelerator.prepare(train_loader, test_loader, model, optimizer)

# 代替loss.backward()

accelerator.backward(loss)

使用 Accelerator 改造后的代码仍然可以通过 torchrun CLI 或通过 Accelerate 自己的 CLI 界面启动

def train_ddp_accelerate():

accelerator = Accelerator()

# Build DataLoaders

transform = transforms.Compose([

transforms.ToTensor(),

transforms.Normalize((0.1307), (0.3081))

])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)

test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)

test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

# Build model

model = None

# Build optimizer

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

# Send everything through `accelerator.prepare`

train_loader, test_loader, model, optimizer = accelerator.prepare(

train_loader, test_loader, model, optimizer

)

# Train for a single epoch

model.train()

for batch_idx, (data, target) in enumerate(train_loader):

output = model(data)

loss = F.nll_loss(output, target)

accelerator.backward(loss)

optimizer.step()

optimizer.zero_grad()

# Evaluate

model.eval()

correct = 0

with torch.no_grad():

for data, target in test_loader:

data, target = data.to(device), target.to(device)

output = model(data)

pred = output.argmax(dim=1, keepdim=True)

correct += pred.eq(target.view_as(pred)).sum().item()

print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

6. Trainer

终于我们来到了最高级的 API——Hugging Face Trainer. 它涵盖了尽可能多的训练类型,同时仍然能够在分布式系统上进行训练,用户根本不需要做任何事情。

首先我们需要导入 Trainer:

from transformers import Trainer

然后我们定义一些 TrainingArguments 来控制所有常用的超参数。 Trainer 需要的训练数据是字典类型的,因此需要制作自定义整理功能。

最后,我们将训练器子类化并编写我们自己的 compute_loss.

之后,这段代码也可以分布式运行,而无需修改任何训练代码!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(

"basic-trainer",

per_device_train_batch_size=64,

per_device_eval_batch_size=64,

num_train_epochs=1,

evaluation_strategy="epoch",

remove_unused_columns=False

)

def collate_fn(examples):

pixel_values = torch.stack([example[0] for example in examples])

labels = torch.tensor([example[1] for example in examples])

return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):

def compute_loss(self, model, inputs, return_outputs=False):

outputs = model(inputs["x"])

target = inputs["labels"]

loss = F.nll_loss(outputs, target)

return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(

model,

training_args,

train_dataset=train_dset,

eval_dataset=test_dset,

data_collator=collate_fn,

)

trainer.train()

***** Running training *****

Num examples = 60000

Num Epochs = 1

Instantaneous batch size per device = 64

Total train batch size (w. parallel, distributed & accumulation) = 64

Gradient Accumulation steps = 1

Total optimization steps = 938

参考链接

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: