徐慧志的个人博客

2025-09-14 从零构建大模型—在无监督数据上进行预训练

发布于 2025年09月14日  •  6 分钟  • 1119 字
Table of contents

这一章主要分为以下三个部分:

一、前情提要

文本生成(前几章讲过的)

步骤:

  1. 分词器将输入文本转换成一系列词元ID
  2. 模型接收词元ID,并生成相应的logits
  3. 这些logits被转换回词元ID,分词器会将其解码为人类可读的文本 logits是表示词汇表中每个词元的概率分布的向量。

logits 怎么理解:

logits 是“未归一化”的概率分数向量。经过 softmax 后,logits 变成“概率分布”。

logits 的用途:

在推理阶段,取 logits 的 argmax 即可得到每个位置最可能的词元;也可以对 logits 应用温度缩放、Top-K、核采样等技术,再做 multinomial 采样,以平衡多样性与一致性。

权重参数

权重参数指的是在训练过程调整的参数。

PyTorch允许通过model.parameters()方法直接访问模型的所有可训练参数(包括Weights和Biases)

二、评估文本生成

初始化

用GPT_CONFIG_124M字典初始化GPTModel类,注意这里只是搭了个框架,随机初始化权重,所以模型生成的文本也是随机生成。


from gpt2_module.gpt2 import GPTModel

GPT_CONFIG_124M ={
    "vocab_size":50257,
    "context_length":1024,
    "emb_dim":768,
    "n_heads":12,
    "n_layers":12,
    "drop_rate":0.1,
    "qkv_bias":False
}

def generate_text_simple(model, idx, max_new_tokens, context_size):
    for _ in range(max_new_tokens):
        # 将当前文本截断至大模型支持的长度
        idx_cond = idx[:, -context_size:]
        with torch.no_grad():
            logits = model(idx_cond)
        # 仅关注最后一个时间步的logits
        logits = logits[:, -1, :]
        # 转换为概率分布
        probs = torch.softmax(logits, dim=-1)
        # 采样下一个token
        idx_next = torch.argmax(probs, dim=-1, keepdim=True)
        # 拼接采样的token
        idx = torch.cat((idx, idx_next), dim=1)
    return idx

def text_to_token_ids(text, tokenizer):
    encoded = tokenizer.encode(text, allowed_special={'<|endoftext|>'})
    encoded_tensor = torch.tensor(encoded).unsqueeze(0)
    return encoded_tensor

def token_ids_to_text(token_ids, tokenizer):
    flat = token_ids.squeeze(0)
    return tokenizer.decode(flat.tolist())

start_context = "Every effort moves you"
tokenizer = tiktoken.get_encoding("gpt2")

model = GPTModel(GPT_CONFIG_124M)

token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(start_context, tokenizer),
    max_new_tokens=10,
    context_size=GPT_CONFIG_124M["context_length"]
)

print(token_ids_to_text(token_ids, tokenizer))

交叉熵损失的计算

模型训练的目标是增大与正确目标词元ID对应的索引位置的softmax概率,也就是最大化正确词元的可能性。

文本评估是衡量生成词元和目标之间的偏差程度。一般用交叉熵损失。

整个过程如下:

Logits → Softmax → 拿到正确位置概率 p[t] → 取负对数 → 平均 → 得到非负损失 → 用梯度下降把损失压向 0(即把正确词元的 softmax 概率往 1 推)。

  1. logits: 模型最后一层输出向量 z ∈ ℝ^V,V 是词表大小。
  2. Softmax:概率 p = Softmax(z) ⇒  p[i] = e^{z[i]} / Σ_j e^{z[j]},这一步把任意实数 logits 变成概率分布,0≤p[i]≤1,Σp[i]=1。
  3. 目标概率(one-hot) 真实标签用 one-hot 向量 y 表示:y[t]=1,其余为 0,t 是目标词元 ID。
  4. 对数概率我们关心的是“正确位置的概率”,即 p[t]。取 log 后得到 log p[t] ∈ (−∞,0]。
  5. 平均对数概率:一个 batch 里就是L̄ = 1/N Σ log p[t_i]
  6. 负平均对数概率 = 交叉熵损失。

反向传播

如何最大化与目标词元对应的softmax概率值呢?大致思路是,更新模型权重,以便模型为我们想要生成的相应词元ID输出更高的值。这个过程叫反向传播。

# 输入
inputs = torch.tensor([[16833, 3626, 6100],
                       [40,    1107, 588]])

with  torch.no_grad():
    logits = model(inputs)
probas = torch.softmax(logits, dim=-1)

# 目标
targets = torch.tensor([[3626,6100,345],[1107,588,11311]])

probas 是形状为 [批次大小, 词元数量, 词汇表大小] 的张量,probas 里的每个元素已经是 softmax 归一化后的概率;接下来只需按真实标签索引出对应概率,再做“取对数 → 求平均 → 取反”即可得到可用于训练的损失值。

#按批次和序列位置锁定目标词元
text_idx = 0
target_probas_1 = probas[text_idx, [0, 1, 2], targets[text_idx]]
print("Text 1:", target_probas_1)

在批次 0 的三个词元位置上,取出模型预测目标词元(targets[0])处的概率值,得到长度为 3 的概率向量 target_probas_1。[0, 1, 2] 表示在该句子的 前3 个词元位置上同时取值。

接下来,对这 3 个概率取自然对数并求平均,再取相反数,就得到该批次的平均交叉熵损失:

# 计算交叉熵损失(或负对数似然)
log_probas = torch.log(torch.cat((target_probas_1, target_probas_2)))
avg_log_probas = torch.mean(log_probas)
neg_avg_log_probas = avg_log_probas * -1

关于交叉熵损失的理解

  1. 目标是通过在训练过程中更新模型的权重,使平均对数概率尽可能接近0。
  2. 通常的做法不是将平均对数概率升至0,而是将负平均对数概率降至0。负平均对数概率就是平均对数概率乘以-1。如何理解负值?

困惑度

困惑度指模型不确定应该选取词汇表的哪个词元来作为下一个词元。较低的困惑度表明模型的预测更接近实际分布。

三、训练

训练大语言模型一共有8个步骤:

(图源《从零构建大模型》图5-11)

从遍历每个训练轮次开始,处理批次,重置梯度,计算损失和新梯度,更新权重,最后以监控步骤(包括打印损失、生成文本样本等操作)结束。

语料

我用的是阿加莎的小说作为训练语料。

4.50 from Paddington是 阿加莎·克里斯蒂1957 年出版的侦探小说,属于“马普尔小姐”系列第7部。它的 Characters为378644,Tokens为107433。

这个语料有10万个词元。

未经训练,模型在语料上的loss为 Training loss: 11.0156,Validation loss: 11.0129。

关于训练,我的设置是23个训练集批次,每个批次包含4个样本,每个样本包含1024个词元。

epoch

1 个 epoch表示:模型已经“看”完整个训练集一次。

在阿加莎这个语料里就是:

  1. 把 107 433 个 token 按 1024 token 一段、无重叠地切成 ~105 段;
  2. 把这 105 段全部前向+反向传播一遍;
  3. 因为 batch_size=4,所以分成 27 个 step 做完。 做完这 27 步,就叫 1 个 epoch。

和深度学习训练不一样的地方

训练时不用epoch而用max_steps或者max_tokens的概念。

  1. 数据不再是静态文件,而是动态采样 很多大规模训练用去重、实时过滤、混合多语种,数据管道永远有新样本,根本没有“遍历完一次”这个概念。
  2. 学习率、评估、checkpoint 都按步数调度

结果

运行了2700步之后,结果是:

为什么全是 � / 重复引号?

改进

于是我把“贪婪 argmax”换成了“多项式采样 ”

之前

for _ in range(max_new):
    idx_crop = idx[:, -cfg.ctx_len:]
    logits = model(idx_crop)[:, -1, :]
    probs  = torch.softmax(logits, dim=-1)
    nxt    = torch.argmax(probs, dim=-1, keepdim=True)
    idx    = torch.cat((idx, nxt), dim=1)

现在


for _ in range(cfg.gen_max_tokens):
    idx_crop = idx[:, -cfg.ctx_len:]
    logits = model(idx_crop)[:, -1, :]
    probs  = torch.softmax(logits, dim=-1)
    nxt    = torch.multinomial(probs, num_samples=1)   # 采样避免死循环
    idx    = torch.cat((idx, nxt), dim=1)

现在的结果如下,比之前好了一些。

训练Trace

训练脚本

#!/usr/bin/env python3
"""
极简 GPT-2 124M 训练脚本
"""
import os
import sys
import math
import torch
import torch.nn as nn
import tiktoken
from dataclasses import dataclass
from pathlib import Path
import swanlab

project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, project_root)

from gpt2_module.gpt2 import GPTModel
from data_process.dataloader import create_dataloader

# ------------------------------------------------------------------
# 1. 所有超参、路径、训练设置 → 一个类管到底
# ------------------------------------------------------------------
@dataclass
class Config:
    # ---- 模型 ----
    vocab_size: int = 50_257
    ctx_len: int = 1_024
    emb_dim: int = 768
    n_heads: int = 12
    n_layers: int = 12
    drop_rate: float = 0.1
    qkv_bias: bool = False

    # ---- 数据 ----
    data_file: str = "/home/data/sophia/resource/4_50 From Paddington - Agatha Christie.txt"
    train_ratio: float = 0.90
    tokenizer_name: str = "gpt2"

    # ---- 训练 ----
    batch_size: int = 8
    max_steps: int = 2_700       # 用步数控制训练长度(比 epoch 更直观)
    eval_freq: int = 100           # 每 N 步验证 & 打印样本
    eval_iter: int = 50            # 验证时只看前 N 个 batch,加快评估
    lr: float = 4e-3
    weight_decay: float = 0.10
    device: str = "cuda" if torch.cuda.is_available() else "cpu"
    seed: int = 123

    # ---- 生成 ----
    start_context: str = "Every effort moves you"
    gen_max_tokens: int = 50

# ------------------------------------------------------------------
# 2. 工具函数
# ------------------------------------------------------------------
def set_seed(seed: int):
    torch.manual_seed(seed)

def load_text(path: str) -> str:
    with open(path, encoding="utf-8") as f:
        return f.read()

def get_tokenizer(name: str):
    return tiktoken.get_encoding(name)

def text_to_ids(text: str, tokenizer) -> torch.Tensor:
    return torch.tensor(tokenizer.encode(text, allowed_special={"<|endoftext|>"})).unsqueeze(0)

def ids_to_text(ids: torch.Tensor, tokenizer) -> str:
    return tokenizer.decode(ids.squeeze(0).tolist())

# ------------------------------------------------------------------
# 3. 数据
# ------------------------------------------------------------------
def build_loaders(cfg: Config, tokenizer):
    text = load_text(cfg.data_file)
    split_idx = int(cfg.train_ratio * len(text))
    train_data, val_data = text[:split_idx], text[split_idx:]

    # 用 stride=ctx_len 实现无重叠切块
    train_loader = create_dataloader(
        train_data, cfg.batch_size, cfg.ctx_len,
        stride=cfg.ctx_len, drop_last=True, shuffle=True, num_workers=0
    )
    val_loader = create_dataloader(
        val_data, cfg.batch_size, cfg.ctx_len,
        stride=cfg.ctx_len, drop_last=False, shuffle=False, num_workers=0
    )
    return train_loader, val_loader

# ------------------------------------------------------------------
# 4. 模型 + 损失
# ------------------------------------------------------------------
def build_model(cfg: Config) -> GPTModel:
    model = GPTModel({
        "vocab_size": cfg.vocab_size,
        "context_length": cfg.ctx_len,
        "emb_dim": cfg.emb_dim,
        "n_heads": cfg.n_heads,
        "n_layers": cfg.n_layers,
        "drop_rate": cfg.drop_rate,
        "qkv_bias": cfg.qkv_bias
    })
    return model.to(cfg.device)

@torch.no_grad()
def calc_loss_loader(loader, model, cfg, max_batch=None):
    model.eval()
    total, n_batch = 0., 0
    for i, (x, y) in enumerate(loader):
        if max_batch and i >= max_batch:
            break
        x, y = x.to(cfg.device), y.to(cfg.device)
        logits = model(x)
        loss = nn.functional.cross_entropy(
            logits.flatten(0, 1), y.flatten()
        )
        total += loss.item()
        n_batch += 1
    return total / n_batch

# ------------------------------------------------------------------
# 5. 训练闭环
# ------------------------------------------------------------------
def train(cfg: Config, model, train_loader, val_loader, tokenizer):
    optimizer = torch.optim.AdamW(model.parameters(), lr=cfg.lr, weight_decay=cfg.weight_decay)
    model.train()
    step = 0
    train_losses, val_losses = [], []

    while step < cfg.max_steps:
        for x, y in train_loader:
            x, y = x.to(cfg.device), y.to(cfg.device)
            optimizer.zero_grad()
            logits = model(x)
            loss = nn.functional.cross_entropy(logits.flatten(0, 1), y.flatten())
            loss.backward()
            optimizer.step()

            step += 1
            # 每步都记录训练 loss & lr
            swanlab.log({"train/loss": loss.item(),
                         "train/lr": optimizer.param_groups[0]["lr"]}, step=step)

            if step % cfg.eval_freq == 0 or step == cfg.max_steps:
                train_loss = calc_loss_loader(train_loader, model, cfg, max_batch=cfg.eval_iter)
                val_loss   = calc_loss_loader(val_loader,  model, cfg, max_batch=cfg.eval_iter)
                # 记录验证指标
                swanlab.log({"eval/train_loss": train_loss,
                             "eval/val_loss": val_loss}, step=step)

                model.eval()
                idx = text_to_ids(cfg.start_context, tokenizer).to(cfg.device)
                with torch.no_grad():
                    for _ in range(cfg.gen_max_tokens):
                        idx_crop = idx[:, -cfg.ctx_len:]
                        logits = model(idx_crop)[:, -1, :]
                        probs  = torch.softmax(logits, dim=-1)
                        nxt    = torch.multinomial(probs, num_samples=1)   # 采样避免死循环
                        idx    = torch.cat((idx, nxt), dim=1)
                text = ids_to_text(idx, tokenizer)
                swanlab.log({"eval/sample": swanlab.Text(text)}, step=step)
                model.train()

            if step >= cfg.max_steps:
                break
    return train_losses, val_losses

# ------------------------------------------------------------------
# 6. 生成
# ------------------------------------------------------------------
@torch.no_grad()
def generate_sample(cfg: Config, model, tokenizer):
    model.eval()
    idx = text_to_ids(cfg.start_context, tokenizer).to(cfg.device)
    max_new = cfg.gen_max_tokens

    for _ in range(max_new):
        idx_crop = idx[:, -cfg.ctx_len:]          # 截断上下文
        logits = model(idx_crop)[:, -1, :]        # 取最后一个时间步
        probs  = torch.softmax(logits, dim=-1)
        nxt    = torch.argmax(probs, dim=-1, keepdim=True)
        idx    = torch.cat((idx, nxt), dim=1)

    text = ids_to_text(idx, tokenizer)
    print("->" + text.replace("\n", " ") + "\n")
    model.train()

# ------------------------------------------------------------------
# 7. main
# ------------------------------------------------------------------
def main():
    cfg = Config()
    swanlab.init(
        project="gpt2-124m-pretrain",
        experiment_name=f"lr{cfg.lr}-bs{cfg.batch_size}-steps{cfg.max_steps}",
        config=vars(cfg),          # 把所有超参存成表格
    )

    set_seed(cfg.seed)
    tokenizer = get_tokenizer(cfg.tokenizer_name)
    train_loader, val_loader = build_loaders(cfg, tokenizer)
    model = build_model(cfg)

    print("initial loss | train:", calc_loss_loader(train_loader, model, cfg, cfg.eval_iter),
          "| val:", calc_loss_loader(val_loader, model, cfg, cfg.eval_iter))

    train(cfg, model, train_loader, val_loader, tokenizer)

    # 保存最终权重
    ckpt_path = os.path.join(PROJECT_ROOT, "gpt2_124M_final.pt")
    torch.save(model.state_dict(), ckpt_path)
    print("saved ->", ckpt_path)
    swanlab.finish()

if __name__ == "__main__":
    PROJECT_ROOT = Path(__file__).resolve().parent
    main()

运行

CUDA_VISIBLE_DEVICES=0,1,2,4 python -u /home/data/sophia/pretraining/train.py | tee run.log
Sein heißt werden, leben heißt lernen.

Der einfache Weg is immer verkehrt.