并行训练代码优化

This commit is contained in:
lee
2025-07-02 18:02:28 +08:00
parent 2219c0a303
commit 5deaf4727f
5 changed files with 918 additions and 330 deletions

View File

@ -3,6 +3,18 @@
<component name="CopilotChatHistory">
<option name="conversations">
<list>
<Conversation>
<option name="createTime" value="1751441743239" />
<option name="id" value="0197ca101d8771bd80f2bc4aaf1a8f19" />
<option name="title" value="新对话 2025年7月02日 15:35:43" />
<option name="updateTime" value="1751441743239" />
</Conversation>
<Conversation>
<option name="createTime" value="1751441398488" />
<option name="id" value="0197ca0adad875168de40d792dcb7b4c" />
<option name="title" value="新对话 2025年7月02日 15:29:58" />
<option name="updateTime" value="1751441398488" />
</Conversation>
<Conversation>
<option name="createTime" value="1750474299387" />
<option name="id" value="0197906617fb7194a0407baae2b1e2eb" />

File diff suppressed because one or more lines are too long

View File

@ -14,6 +14,7 @@ from tools.dataset import get_transform
from tools.image_joint import merge_imgs
from configs import trainer_tools
import yaml
from datetime import datetime
with open('configs/test.yml', 'r') as f:
conf = yaml.load(f, Loader=yaml.FullLoader)

View File

@ -5,6 +5,10 @@ import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torch.nn.parallel import DistributedDataParallel as DDP
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.utils.data.distributed
from model.loss import FocalLoss
from tools.dataset import load_data
@ -12,136 +16,294 @@ import matplotlib.pyplot as plt
from configs import trainer_tools
import yaml
from datetime import datetime
with open('configs/scatter.yml', 'r') as f:
conf = yaml.load(f, Loader=yaml.FullLoader)
# Data Setup
train_dataloader, class_num = load_data(training=True, cfg=conf)
val_dataloader, _ = load_data(training=False, cfg=conf)
def load_config(config_path='configs/scatter.yml'):
"""加载配置文件."""
with open(config_path, 'r') as f:
return yaml.load(f, Loader=yaml.FullLoader)
# 加载配置
conf = load_config()
# 数据加载封装
def load_datasets():
"""加载训练和验证数据集,并为分布式训练创建 DistributedSampler."""
train_dataset, class_num = load_data(training=True, cfg=conf)
val_dataset, _ = load_data(training=False, cfg=conf)
if conf['base']['distributed']:
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset)
else:
train_sampler = None
val_sampler = None
train_dataloader = torch.utils.data.DataLoader(
train_dataset,
batch_size=conf['data']['batch_size'],
shuffle=(train_sampler is None),
num_workers=conf['data']['num_workers'],
pin_memory=conf['data']['pin_memory'],
sampler=train_sampler
)
val_dataloader = torch.utils.data.DataLoader(
val_dataset,
batch_size=conf['data']['batch_size'],
shuffle=False,
num_workers=conf['data']['num_workers'],
pin_memory=conf['data']['pin_memory'],
sampler=val_sampler
)
return train_dataloader, val_dataloader, class_num
# 加载数据集
train_dataloader, val_dataloader, class_num = load_datasets()
tr_tools = trainer_tools(conf)
backbone_mapping = tr_tools.get_backbone()
metric_mapping = tr_tools.get_metric(class_num)
# 设备管理封装
def get_device(device_config=None):
"""根据配置返回设备CPU/GPU。在分布式环境下初始化进程组."""
if device_config is None:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
else:
device = torch.device(device_config)
if conf['base']['distributed']:
dist.init_process_group(backend='nccl')
return device
# 获取设备
device = get_device(conf['base']['device'])
# 模型初始化
if conf['models']['backbone'] in backbone_mapping:
model = backbone_mapping[conf['models']['backbone']]().to(conf['base']['device'])
model = backbone_mapping[conf['models']['backbone']]().to(device)
else:
raise ValueError('不支持该模型: {}'.format({conf['models']['backbone']}))
if conf['training']['metric'] in metric_mapping:
metric = metric_mapping[conf['training']['metric']]()
metric = metric_mapping[conf['training']['metric']]().to(device)
else:
raise ValueError('不支持的metric类型: {}'.format(conf['training']['metric']))
rank = 0
if torch.cuda.device_count() > 1 and conf['base']['distributed']:
print("Let's use", torch.cuda.device_count(), "GPUs!")
model = nn.DataParallel(model)
metric = nn.DataParallel(metric)
dist.barrier()
model = DDP(model, device_ids=[rank])
metric = DDP(metric, device_ids=[rank])
# Training Setup
if conf['training']['loss'] == 'focal_loss':
criterion = FocalLoss(gamma=2)
else:
criterion = nn.CrossEntropyLoss()
def initialize_components():
# 封装模型、损失函数和优化器的初始化
if conf['training']['loss'] == 'focal_loss':
criterion = FocalLoss(gamma=2)
else:
criterion = nn.CrossEntropyLoss()
optimizer_mapping = tr_tools.get_optimizer(model, metric)
if conf['training']['optimizer'] in optimizer_mapping:
optimizer = optimizer_mapping[conf['training']['optimizer']]()
scheduler_mapping = tr_tools.get_scheduler(optimizer)
scheduler = scheduler_mapping[conf['training']['scheduler']]()
print('使用{}优化器 使用{}调度器'.format(conf['training']['optimizer'],
conf['training']['scheduler']))
optimizer_mapping = tr_tools.get_optimizer(model, metric)
if conf['training']['optimizer'] in optimizer_mapping:
optimizer = optimizer_mapping[conf['training']['optimizer']]()
scheduler_mapping = tr_tools.get_scheduler(optimizer)
scheduler = scheduler_mapping[conf['training']['scheduler']]()
print('使用{}优化器 使用{}调度器'.format(conf['training']['optimizer'],
conf['training']['scheduler']))
return criterion, optimizer, scheduler
else:
raise ValueError('不支持的优化器类型: {}'.format(conf['training']['optimizer']))
else:
raise ValueError('不支持的优化器类型: {}'.format(conf['training']['optimizer']))
# 初始化组件
criterion, optimizer, scheduler = initialize_components()
# Checkpoints Setup
checkpoints = conf['training']['checkpoints']
os.makedirs(checkpoints, exist_ok=True)
def train_epoch(model, dataloader, optimizer, criterion, device):
model.train()
train_loss = 0
for data, labels in tqdm(dataloader, desc="Training", ascii=True, total=len(dataloader)):
data, labels = data.to(device), labels.to(device)
embeddings = model(data).to(device)
if not conf['training']['metric'] == 'softmax':
thetas = metric(embeddings, labels)
else:
thetas = metric(embeddings)
loss = criterion(thetas, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
return train_loss / len(dataloader)
def validate_epoch(model, dataloader, criterion, device):
model.eval()
val_loss = 0
with torch.no_grad():
for data, labels in tqdm(dataloader, desc="Validation", ascii=True, total=len(dataloader)):
data, labels = data.to(device), labels.to(device)
embeddings = model(data).to(device)
if not conf['training']['metric'] == 'softmax':
thetas = metric(embeddings, labels)
else:
thetas = metric(embeddings)
loss = criterion(thetas, labels)
val_loss += loss.item()
return val_loss / len(dataloader)
def save_model(model, path, distributed):
if distributed and torch.cuda.device_count() > 1:
if dist.get_rank() == 0:
torch.save(model.module.state_dict(), path)
else:
torch.save(model.state_dict(), path)
def write_log(log_info, log_dir):
with open(log_dir, 'a') as f:
f.write(log_info + '\n')
def plot_losses(epochs, train_losses, val_losses, save_path):
plt.plot(epochs, train_losses, color='blue', label='Train Loss')
plt.plot(epochs, val_losses, color='red', label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.savefig(save_path)
plt.close()
# 模型恢复封装
def restore_model(model, device):
if conf['training']['restore']:
print('load pretrain model: {}'.format(conf['training']['restore_model']))
model.load_state_dict(torch.load(conf['training']['restore_model'], map_location=device))
return model
# 日志和学习率记录封装
def log_and_print(e, train_loss_avg, val_loss_avg, current_lr, log_dir):
if conf['base']['distributed'] and dist.get_rank() != 0:
return
log_info = ("[{:%Y-%m-%d %H:%M:%S}] Epoch {}/{}, train_loss: {}, val_loss: {} lr:{}"
.format(datetime.now(),
e,
conf['training']['epochs'],
train_loss_avg,
val_loss_avg,
current_lr))
print(log_info)
write_log(log_info, log_dir)
print("%d个epoch的学习率%f" % (e, current_lr))
# 模型评估与保存封装
def evaluate_and_save(val_loss_avg, best_loss, model, checkpoints, distributed):
if val_loss_avg < best_loss:
best_path = osp.join(checkpoints, 'best.pth')
save_model(model, best_path, distributed)
best_loss = val_loss_avg
return best_loss
def run_training(rank, world_size, conf):
"""在指定 rank 上运行训练 loop。
Args:
rank: 当前进程的索引。
world_size: 进程总数。
conf: 配置字典。
"""
dist.init_process_group(backend='nccl', rank=rank, world_size=world_size)
device = torch.device('cuda', rank)
# 数据加载器和模型等需要重新初始化以确保每个进程独立工作
train_dataloader, val_dataloader, class_num = load_datasets()
tr_tools = trainer_tools(conf)
backbone_mapping = tr_tools.get_backbone()
metric_mapping = tr_tools.get_metric(class_num)
# 模型初始化
if conf['models']['backbone'] in backbone_mapping:
model = backbone_mapping[conf['models']['backbone']]().to(device)
else:
raise ValueError('不支持该模型: {}'.format({conf['models']['backbone']}))
if conf['training']['metric'] in metric_mapping:
metric = metric_mapping[conf['training']['metric']]().to(device)
else:
raise ValueError('不支持的metric类型: {}'.format(conf['training']['metric']))
model = DDP(model, device_ids=[rank])
metric = DDP(metric, device_ids=[rank])
# 初始化组件
criterion, optimizer, scheduler = initialize_components()
# 恢复模型(如果需要)
model = restore_model(model, device)
train_losses = []
val_losses = []
epochs = []
temp_loss = 1000
log_dir = osp.join(conf['logging']['logging_dir'])
for e in range(conf['training']['epochs']):
train_loss_avg = train_epoch(model, train_dataloader, optimizer, criterion, device)
train_losses.append(train_loss_avg)
val_loss_avg = validate_epoch(model, val_dataloader, criterion, device)
val_losses.append(val_loss_avg)
temp_loss = evaluate_and_save(val_loss_avg, temp_loss, model, conf['training']['checkpoints'], True)
scheduler.step()
current_lr = optimizer.param_groups[0]['lr']
log_and_print(e, train_loss_avg, val_loss_avg, current_lr, log_dir)
epochs.append(e)
last_path = osp.join(conf['training']['checkpoints'], 'last.pth')
save_model(model, last_path, True)
plot_losses(epochs, train_losses, val_losses, 'loss/mobilenetv3Large_2250_0316.png')
dist.destroy_process_group()
def main():
world_size = torch.cuda.device_count()
mp.spawn(
run_training,
args=(world_size, conf),
nprocs=world_size,
join=True,
)
if __name__ == '__main__':
print('backbone>{} '.format(conf['models']['backbone']),
'metric>{} '.format(conf['training']['metric']),
'checkpoints>{} '.format(conf['training']['checkpoints']),
)
train_losses = []
val_losses = []
epochs = []
temp_loss = 100
if conf['training']['restore']:
print('load pretrain model: {}'.format(conf['training']['restore_model']))
model.load_state_dict(torch.load(conf['training']['restore_model'],
map_location=conf['base']['device']))
for e in range(conf['training']['epochs']):
train_loss = 0
model.train()
for train_data, train_labels in tqdm(train_dataloader,
desc="Epoch {}/{}"
.format(e, conf['training']['epochs']),
ascii=True,
total=len(train_dataloader)):
train_data = train_data.to(conf['base']['device'])
train_labels = train_labels.to(conf['base']['device'])
train_embeddings = model(train_data).to(conf['base']['device']) # [256,512]
# pdb.set_trace()
if not conf['training']['metric'] == 'softmax':
thetas = metric(train_embeddings, train_labels) # [256,357]
else:
thetas = metric(train_embeddings)
tloss = criterion(thetas, train_labels)
optimizer.zero_grad()
tloss.backward()
optimizer.step()
train_loss += tloss.item()
train_lossAvg = train_loss / len(train_dataloader)
train_losses.append(train_lossAvg)
epochs.append(e)
val_loss = 0
model.eval()
with torch.no_grad():
for val_data, val_labels in tqdm(val_dataloader, desc="val",
ascii=True, total=len(val_dataloader)):
val_data = val_data.to(conf['base']['device'])
val_labels = val_labels.to(conf['base']['device'])
val_embeddings = model(val_data).to(conf['base']['device'])
if not conf['training']['metric'] == 'softmax':
thetas = metric(val_embeddings, val_labels)
else:
thetas = metric(val_embeddings)
vloss = criterion(thetas, val_labels)
val_loss += vloss.item()
val_lossAvg = val_loss / len(val_dataloader)
val_losses.append(val_lossAvg)
if val_lossAvg < temp_loss:
if torch.cuda.device_count() > 1:
torch.save(model.state_dict(), osp.join(checkpoints, 'best.pth'))
else:
torch.save(model.state_dict(), osp.join(checkpoints, 'best.pth'))
temp_loss = val_lossAvg
scheduler.step()
current_lr = optimizer.param_groups[0]['lr']
log_info = ("[{:%Y-%m-%d %H:%M:%S}] Epoch {}/{}, train_loss: {}, val_loss: {} lr:{}"
.format(datetime.now(),
e,
conf['training']['epochs'],
train_lossAvg,
val_lossAvg,
current_lr))
print(log_info)
# 写入日志文件
with open(osp.join(conf['logging']['logging_dir']), 'a') as f:
f.write(log_info + '\n')
print("%d个epoch的学习率%f" % (e, current_lr))
if torch.cuda.device_count() > 1 and conf['base']['distributed']:
torch.save(model.module.state_dict(), osp.join(checkpoints, 'last.pth'))
if conf['base']['distributed']:
main()
else:
torch.save(model.state_dict(), osp.join(checkpoints, 'last.pth'))
plt.plot(epochs, train_losses, color='blue')
plt.plot(epochs, val_losses, color='red')
# plt.savefig('lossMobilenetv3.png')
plt.savefig('loss/mobilenetv3Large_2250_0316.png')
run_training(rank=0, world_size=1, conf=conf)

147
train_compare.py.bak Normal file
View File

@ -0,0 +1,147 @@
import os
import os.path as osp
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from model.loss import FocalLoss
from tools.dataset import load_data
import matplotlib.pyplot as plt
from configs import trainer_tools
import yaml
from datetime import datetime
with open('configs/scatter.yml', 'r') as f:
conf = yaml.load(f, Loader=yaml.FullLoader)
# Data Setup
train_dataloader, class_num = load_data(training=True, cfg=conf)
val_dataloader, _ = load_data(training=False, cfg=conf)
tr_tools = trainer_tools(conf)
backbone_mapping = tr_tools.get_backbone()
metric_mapping = tr_tools.get_metric(class_num)
if conf['models']['backbone'] in backbone_mapping:
model = backbone_mapping[conf['models']['backbone']]().to(conf['base']['device'])
else:
raise ValueError('不支持该模型: {}'.format({conf['models']['backbone']}))
if conf['training']['metric'] in metric_mapping:
metric = metric_mapping[conf['training']['metric']]()
else:
raise ValueError('不支持的metric类型: {}'.format(conf['training']['metric']))
if torch.cuda.device_count() > 1 and conf['base']['distributed']:
print("Let's use", torch.cuda.device_count(), "GPUs!")
model = nn.DataParallel(model)
metric = nn.DataParallel(metric)
# Training Setup
if conf['training']['loss'] == 'focal_loss':
criterion = FocalLoss(gamma=2)
else:
criterion = nn.CrossEntropyLoss()
optimizer_mapping = tr_tools.get_optimizer(model, metric)
if conf['training']['optimizer'] in optimizer_mapping:
optimizer = optimizer_mapping[conf['training']['optimizer']]()
scheduler_mapping = tr_tools.get_scheduler(optimizer)
scheduler = scheduler_mapping[conf['training']['scheduler']]()
print('使用{}优化器 使用{}调度器'.format(conf['training']['optimizer'],
conf['training']['scheduler']))
else:
raise ValueError('不支持的优化器类型: {}'.format(conf['training']['optimizer']))
# Checkpoints Setup
checkpoints = conf['training']['checkpoints']
os.makedirs(checkpoints, exist_ok=True)
if __name__ == '__main__':
print('backbone>{} '.format(conf['models']['backbone']),
'metric>{} '.format(conf['training']['metric']),
'checkpoints>{} '.format(conf['training']['checkpoints']),
)
train_losses = []
val_losses = []
epochs = []
temp_loss = 100
if conf['training']['restore']:
print('load pretrain model: {}'.format(conf['training']['restore_model']))
model.load_state_dict(torch.load(conf['training']['restore_model'],
map_location=conf['base']['device']))
for e in range(conf['training']['epochs']):
train_loss = 0
model.train()
for train_data, train_labels in tqdm(train_dataloader,
desc="Epoch {}/{}"
.format(e, conf['training']['epochs']),
ascii=True,
total=len(train_dataloader)):
train_data = train_data.to(conf['base']['device'])
train_labels = train_labels.to(conf['base']['device'])
train_embeddings = model(train_data).to(conf['base']['device']) # [256,512]
# pdb.set_trace()
if not conf['training']['metric'] == 'softmax':
thetas = metric(train_embeddings, train_labels) # [256,357]
else:
thetas = metric(train_embeddings)
tloss = criterion(thetas, train_labels)
optimizer.zero_grad()
tloss.backward()
optimizer.step()
train_loss += tloss.item()
train_lossAvg = train_loss / len(train_dataloader)
train_losses.append(train_lossAvg)
epochs.append(e)
val_loss = 0
model.eval()
with torch.no_grad():
for val_data, val_labels in tqdm(val_dataloader, desc="val",
ascii=True, total=len(val_dataloader)):
val_data = val_data.to(conf['base']['device'])
val_labels = val_labels.to(conf['base']['device'])
val_embeddings = model(val_data).to(conf['base']['device'])
if not conf['training']['metric'] == 'softmax':
thetas = metric(val_embeddings, val_labels)
else:
thetas = metric(val_embeddings)
vloss = criterion(thetas, val_labels)
val_loss += vloss.item()
val_lossAvg = val_loss / len(val_dataloader)
val_losses.append(val_lossAvg)
if val_lossAvg < temp_loss:
if torch.cuda.device_count() > 1:
torch.save(model.state_dict(), osp.join(checkpoints, 'best.pth'))
else:
torch.save(model.state_dict(), osp.join(checkpoints, 'best.pth'))
temp_loss = val_lossAvg
scheduler.step()
current_lr = optimizer.param_groups[0]['lr']
log_info = ("[{:%Y-%m-%d %H:%M:%S}] Epoch {}/{}, train_loss: {}, val_loss: {} lr:{}"
.format(datetime.now(),
e,
conf['training']['epochs'],
train_lossAvg,
val_lossAvg,
current_lr))
print(log_info)
# 写入日志文件
with open(osp.join(conf['logging']['logging_dir']), 'a') as f:
f.write(log_info + '\n')
print("%d个epoch的学习率%f" % (e, current_lr))
if torch.cuda.device_count() > 1 and conf['base']['distributed']:
torch.save(model.module.state_dict(), osp.join(checkpoints, 'last.pth'))
else:
torch.save(model.state_dict(), osp.join(checkpoints, 'last.pth'))
plt.plot(epochs, train_losses, color='blue')
plt.plot(epochs, val_losses, color='red')
# plt.savefig('lossMobilenetv3.png')
plt.savefig('loss/mobilenetv3Large_2250_0316.png')