2025-11-05 从零构建大模型—针对分类的微调
发布于 2025年11月05日 • 3 分钟 • 571 字
Table of contents
这本书最后两章的例子9月底就运行完了,但是微信读书会员到期了。最近又开了会员复习了一遍,记录下来。
微调通常可分为以下三个阶段:
第一阶段:数据准备
包括下载数据集、进行数据预处理以及构建数据加载器。
第二阶段:模型准备
涵盖模型初始化、加载预训练权重、调整模型结构,并实现评估工具。
第三阶段:模型微调与部署
包括执行模型微调、评估微调效果,以及在新数据上进行推理。
在语言模型微调中,主要分为分类微调和指令微调两种类型。分类微调相对简单,例如在垃圾消息检测任务中,模型只需输出"垃圾消息"或"非垃圾消息"两类结果,适用于对分类精度要求较高的场景。本章重点在于通过调整模型结构来实现微调,因此可以重点关注模型结构的修改方法。
第一阶段:准备数据
1. 下载数据
网上有公开的垃圾邮件和非垃圾邮件数据集可供下载。
2. 数据集预处理
预处理时主要需要注意设置max_length进行截断和填充。另外,在分类任务中需要特别关注各类别数据的均衡性。
from uu import encode
import torch
from torch.utils.data import Dataset
import tiktoken
import pandas as pd
class SpamDataset(Dataset):
def __init__(self, csv_file, tokenizer, max_len=200,pad_token_id=50256):
self.csv_file = csv_file
self.data = pd.read_csv(csv_file, sep='\\t')
self.encoded_texts =[tokenizer.encode(text) for text in self.data['text']]
self.max_len = max_len
if self.max_len:
processed_texts = []
for encoded in self.encoded_texts:
# 截断到最大长度
truncated = encoded[:self.max_len]
# 填充到最大长度
padded = truncated + [pad_token_id] * (self.max_len - len(truncated))
processed_texts.append(padded)
self.encoded_texts = processed_texts
def __getitem__(self, index):
encoded = self.encoded_texts[index]
label = self.data.iloc[index]['label']
return (torch.tensor(encoded, dtype=torch.long), torch.tensor(label, dtype=torch.long))
def __len__(self):
return len(self.data)
3. 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_worker)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_worker)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_worker)
第二阶段:修改模型
1. 模型初始化
GPTModel是在《徒手组装GPT》章节中构建好的类。
BASE_CONFIG = {
"vocab_size": 50257,
"context_length": 1024,
"drop_rate": 0.0,
"qkv_bias": True
}
models_config = {
"gpt2": {"emb_dim":768,"n_heads":12, "n_layers":12,},
"gpt2-medium": {"emb_dim":1024,"n_heads":16, "n_layers":24,},
"gpt2-large": {"emb_dim":1280,"n_heads":20, "n_layers":36,},
"gpt2-xl": {"emb_dim":1600,"n_heads":25, "n_layers":48,},
}
CHOOSE_MODEL= "gpt2-xl"
BASE_CONFIG.update(models_config[CHOOSE_MODEL])
model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")")
model = GPTModel(BASE_CONFIG)
2. 加载预训练权重
这里也是之前的章节写过的。
params = build_openai_numpy_dict(CHOOSE_MODEL)
load_weights_into_gpt(model, params)
model.eval()
3. 修改模型结构
这是本章最重要的部分。
第一步:确定微调策略 需要考虑是微调选定层还是微调所有层。在基于神经网络的语言模型中,较低层通常捕捉基本的语言结构和语义,适用于广泛的任务和数据集,而最后几层(靠近输出的层)更侧重于捕捉细微的语言模式和特定任务的特征。因此,通常选择微调靠近输出层的少数层。
第二步:修改输出层 原始输出层的输出维度是词汇表大小(50257),现在要进行二分类,需要将输出节点的数量与类别数量相匹配。
GPTModel的结构如下:
GPTModel(
(tok_emb): Embedding(50257, 768)
(pos_emb): Embedding(1024, 768)
(drop_emb): Dropout(p=0.0, inplace=False)
(trf_blocks): Sequential(
...
(11): TransformerBlock(
(att): MultiHeadAttention(
(W_query): Linear(in_features=768, out_features=768, bias=True)
(W_key): Linear(in_features=768, out_features=768, bias=True)
(W_value): Linear(in_features=768, out_features=768, bias=True)
(out_proj): Linear(in_features=768, out_features=768, bias=True)
(dropout): Dropout(p=0.0, inplace=False)
)
(ff): FeedForward(
(layers): Sequential(
(0): Linear(in_features=768, out_features=3072, bias=True)
(1): GELU()
(2): Linear(in_features=3072, out_features=768, bias=True)
)
)
(norm1): LayerNorm()
(norm2): LayerNorm()
(drop_resid): Dropout(p=0.0, inplace=False)
)
)
(final_norm): LayerNorm()
(out_head): Linear(in_features=768, out_features=50257, bias=False)
)
这里需要将out_head替换为新的输出层,并对其进行微调。
具体操作:
首先冻结模型,将所有层设为不可训练:
for param in model.parameters():
param.requires_grad = False
然后添加分类层(新添加的输出层的requires_grad属性默认设置为True):
torch.manual_seed(123)
num_classes = 2
model.out_head = torch.nn.Linear(
in_features=BASE_CONFIG["emb_dim"],
out_features=num_classes
)
最后添加少量其他层参与微调,将最后一个Transformer块和连接该块到输出层的最终层归一化模块设置为可训练:
for param in model.trf_blocks[-1].parameters():
param.requires_grad = True
for param in model.final_norm.parameters():
param.requires_grad = True
4. 实现评估工具
我们的目标是最大化模型的垃圾消息分类准确率,因此需要定义训练期间要优化的损失函数。
这里需要实现两个功能:计算分类准确率和计算分类损失。
计算分类准确率:
def calc_accuracy_loader(data_loader, model, device, num_batches=None):
model.eval()
correct_predictions, num_examples =0,0
if num_batches is None:
num_batches =len(data_loader)
else:
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i < num_batches:
input_batch =input_batch.to(device)
target_batch =target_batch.to(device)
with torch.no_grad():
logits = model(input_batch)
logits = logits[:, -1, :] # (B, 2)
predicted_label = torch.argmax(logits, dim=-1)
num_examples += predicted_label.shape[0]
correct_predictions += (predicted_label == target_batch).sum().item()
else:
break
return correct_predictions / num_examples
GPT2模型通过softmax函数将50257个输出转换为概率,然后利用argmax函数返回最高概率的位置,来计算大语言模型生成的下一个词元的词元ID。
代码中分类模型也是通过查找最高概率分数的索引位置获得类别标签:
logits = outputs[:, -1, :]
label = torch.argmax(logits)
print("Class label:", label.item())
计算损失函数: 使用交叉熵损失作为替代来最大化准确率。这里要注意的是优化最后一个词元而不是所有词元:logits = model(input_batch)[:, -1, :]。
为什么只需要关注最后一个输出词元?
如果输入n个词元,输出也会是n个词元。按照因果注意力掩码机制,每个词元只能关注当前及之前的位置,从而确保每个词元只受自己和之前词元的影响。因此,最后一个词元能够知晓之前所有数据的信息,所以我们重点关注最后一个输出词元。
def calc_loss_batch(input_batch, output_batch, model, device):
input_batch = input_batch.to(device)
output_batch = output_batch.to(device)
logits = model(input_batch)[:, -1, :]
loss = torch.nn.functional.cross_entropy(logits, output_batch)
return loss
def calc_loss_loader(data_loader, model, device, num_batches=None):
total_loss =0.0
if len(data_loader) == 0:
return float("nan")
elif num_batches is None:
num_batches =len(data_loader)
else:
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i < num_batches:
loss = calc_loss_batch(input_batch, target_batch, model, device)
total_loss += loss.item()
else:
break
return total_loss / num_batches
第三阶段:微调模型
1. 训练代码实现
在每次循环中,我们计算每个训练集批次的损失以确定损失梯度,然后使用这些梯度来更新模型权重,以便训练集损失最小化。
训练步骤:
- 遍历训练轮次(一轮就是完整地遍历一次训练集)
- 在每个训练轮次中遍历批次(批次数量由训练集大小除以每个批次的大小确定)
- 从上一个批次迭代中重置损失函数
- 计算当前批次的损失
- 反向传播以计算损失梯度
- 使用损失梯度更新模型权重
- 打印训练集和验证集的损失(可选) 训练代码:
for epoch in range(num_epochs): # step 1
epoch_loss = 0.0
epoch_batches = 0
model.train()
for input_batch, output_batch in train_loader: # step 2
optimizer.zero_grad() # step 3
loss = calc_loss_batch(input_batch, output_batch, model, device) # step 4
loss.backward()# step 5
optimizer.step() # step 6
global_step += 1
epoch_loss += loss.item()
epoch_batches += 1
if global_step % eval_freq ==0:
train_loss, val_loss = evaluate_model(
model, train_loader, val_loader, device, eval_iter
)
train_osses.append(train_loss)
val_losses.append(val_loss)
def evaluate_model(model, train_loader, val_loader, device, eval_iter):
model.eval()
with torch.no_grad():
train_loss = calc_loss_loader(
train_loader, model, device, num_batches=eval_iter
)
val_loss = calc_loss_loader(
val_loader, model, device, num_batches=eval_iter
)
model.train()
return train_loss, val_loss
训练过程中的关键问题:
- 如何选择训练轮数? 通常情况下,5轮是一个不错的起点。如果模型在前几轮之后出现过拟合,则可能需要减少轮数。相反,如果趋势表明验证集损失可能随着进一步训练而改善,则应该增加轮数。
- 如何选择批次大小? 最开始一般取单卡GPU最大可塞下的batch_size的0.7倍。
2. 评估微调后的模型
可以通过计算损失函数值和对测试集进行推理生成来评估模型性能。
3. 在新数据上使用模型进行推理
主要是模型的保存和加载:
torch.save(model.state_dict(), "review_classifier.pth")