mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
485 字
1 分钟
RLHF 与 DPO 偏好对齐技术
2025-02-16

一、为什么需要偏好对齐#

1.1 预训练与对齐的区别#

graph TB subgraph "预训练阶段" A["大规模文本"] B["预测下一个词"] C["语言能力"] end subgraph "对齐阶段" D["人类偏好数据"] E["Reward Model"] F["Policy 优化"] G["有用/无害/诚实"] end A --> B --> C D --> E --> F --> G
阶段目标训练数据能力提升
预训练预测下一个 Token互联网文本语言知识
SFT任务执行人类示范任务能力
RLHF符合人类偏好人类偏好排序对齐能力
DPO符合人类偏好偏好 pair 数据对齐能力(简化)

1.2 有用、诚实、无害(HHH)#

graph triangle A["Helpful 有帮助"] --> B["Harmless 无害"] B --> C["Honest 诚实"] C --> A style A fill:#90EE90 style B fill:#FFD700 style C fill:#87CEEB

二、Reward Model 训练#

2.1 Reward Model 架构#

graph LR A["Prompt"] --> B["SFT 模型"] A --> C["Response"] B --> C C --> D["Reward Model"] D --> E["标量分数"]
class RewardModel(torch.nn.Module):
def __init__(self, base_model):
super().__init__()
self.base_model = base_model
# 替换 LM head 为 reward head
self.value_head = torch.nn.Linear(
base_model.config.hidden_size,
1,
bias=False
)
def forward(self, input_ids, attention_mask):
outputs = self.base_model(
input_ids=input_ids,
attention_mask=attention_mask
)
# 使用最后一个 token 的 hidden state 预测 reward
last_hidden = outputs.last_hidden_state[:, -1, :]
reward = self.value_head(last_hidden)
return reward

2.2 偏好数据构建#

class PreferenceDataBuilder:
def __init__(self, annotation_interface):
self.interface = annotation_interface
def build_preference_data(self, prompts: list, responses: list) -> list:
"""
构建偏好数据:
给定一个 prompt,收集多个 response,由人类标注偏好
"""
preference_data = []
for prompt in prompts:
# 生成多个候选 response
candidates = self.generate_candidates(prompt, num_choices=4)
# 人类偏好标注
# 标注界面展示 prompt + candidates,标注哪个更好
annotations = self.interface.annotate(prompt, candidates)
# 构建 pairwise 偏好数据
for chosen, rejected in annotations:
preference_data.append({
"prompt": prompt,
"chosen": chosen,
"rejected": rejected,
"preference": 1 # chosen 优于 rejected
})
return preference_data
def generate_candidates(self, prompt, num_choices=4):
"""使用模型生成多样化的候选回复"""
candidates = []
for _ in range(num_choices):
response = self.model.generate(
prompt,
temperature=0.8, # 高温度增加多样性
top_p=0.95
)
candidates.append(response)
return candidates

2.3 Reward Model 训练损失#

def reward_model_loss(reward_chosen, reward_rejected):
"""
Reward Model 损失函数:Bradley-Terry 模型
目标:P(chosen > rejected) = sigmoid(reward_chosen - reward_rejected)
"""
# 偏好差异
diff = reward_chosen - reward_rejected
# 对数损失
loss = -torch.log(torch.sigmoid(diff)).mean()
# 添加对比损失(可选)
contrastive_loss = torch.relu(1 - diff).mean()
return loss + 0.1 * contrastive_loss
class RewardTrainer:
def training_step(self, batch):
# 计算 chosen response 的 reward
reward_chosen = self.reward_model(
input_ids=batch["chosen_input_ids"],
attention_mask=batch["chosen_attention_mask"]
)
# 计算 rejected response 的 reward
reward_rejected = self.reward_model(
input_ids=batch["rejected_input_ids"],
attention_mask=batch["rejected_attention_mask"]
)
loss = reward_model_loss(reward_chosen, reward_rejected)
self.backward(loss)
return loss

三、PPO 算法原理#

3.1 PPO 在 RLHF 中的应用#

graph TB subgraph "RLHF 流程" A["Policy π"] --> B["生成 Response"] B --> C["Reward Model 评分"] C --> D["PPO 更新"] D --> A end subgraph "KL 约束" D --> E["KL(π || π_old)"] E --> F["加入损失函数"] end

3.2 PPO 核心更新公式#

class PPOTrainer:
def __init__(self, policy, ref_policy, reward_model, kl_coef=0.04):
self.policy = policy
self.ref_policy = ref_policy # SFT 模型作为参考
self.reward_model = reward_model
self.kl_coef = kl_coef
def compute_rewards(self, logits, responses, log_probs_old):
"""
计算广义优势估计(GAE)
"""
# 1. 从 Reward Model 获取即时奖励
rewards = self.reward_model(responses)
# 2. KL 散度惩罚(防止 policy 偏离 SFT 太远)
kl_penalty = self.kl_coef * self.compute_kl_penalty(
logits, log_probs_old
)
# 3. 广义优势估计
advantages = self.compute_gae(rewards, kl_penalty)
return advantages
def compute_kl_penalty(self, logits, log_probs_old):
"""
KL(π_new || π_old) 惩罚
"""
log_probs_new = torch.log_softmax(logits, dim=-1)
kl = torch.exp(log_probs_new) * (log_probs_new - log_probs_old)
return kl.sum(dim=-1)
def ppo_update(self, batches):
"""
PPO 核心更新
"""
for batch in batches:
# 1. 计算 ratio = π_new / π_old
ratio = torch.exp(batch["log_probs"] - batch["log_probs_old"])
# 2. PPO-Clip 目标函数
surr1 = ratio * batch["advantages"]
surr2 = torch.clamp(
ratio,
1 - self.epsilon, # 0.2
1 + self.epsilon
) * batch["advantages"]
# 3. 取最小值(clip 外的部分被忽略)
policy_loss = -torch.min(surr1, surr2).mean()
# 4. 添加 KL 惩罚
kl_penalty = batch["kl_penalty"].mean()
total_loss = policy_loss + kl_penalty
self.optimizer.zero_grad()
self.backward(total_loss)
self.optimizer.step()

3.3 PPO 超参数#

超参数典型值说明
epsilon0.2PPO clip 范围
gamma1.0折扣因子(通常为 1,无 discount)
lambda0.95GAE 参数
kl_coef0.04KL 惩罚系数
PPO epochs4-5每次生成数据的更新轮数
mini_batch1-4小批量大小

四、DPO 直接偏好优化#

4.1 DPO 核心思想#

graph TB subgraph "RLHF(复杂)" A["Reward Model 训练"] --> B["PPO 策略优化"] B --> C["LLM 生成"] end subgraph "DPO(简洁)" D["偏好数据"] --> E["直接优化 LLM"] end

DPO 核心洞察:Reward Model 训练 + PPO 优化可以合并为一个单一的损失函数。

class DPOConfig:
def __init__(
self,
beta: float = 0.1, # KL 惩罚系数
lr: float = 1e-6,
batch_size: int = 8,
):
self.beta = beta
self.lr = lr
self.batch_size = batch_size
def dpo_loss(policy_chosen_logps, policy_rejected_logps,
ref_chosen_logps, ref_rejected_logps, beta=0.1):
"""
DPO 损失函数
直观理解:
- policy_chosen - ref_chosen: chosen response 在新策略下比参考策略好的程度
- policy_rejected - ref_rejected: rejected response 在新策略下比参考策略差的程度
目标:最大化 (chosen 比 rejected 好的程度)
"""
# 策略概率比
chosen_ratio = torch.exp(policy_chosen_logps - ref_chosen_logps)
rejected_ratio = torch.exp(policy_rejected_logps - ref_rejected_logps)
# DPO 公式
# log(sigmoid(chosen_advantage - rejected_advantage))
chosen_advantage = torch.log(chosen_ratio) / beta
rejected_advantage = torch.log(rejected_ratio) / beta
loss = -torch.log(torch.sigmoid(chosen_advantage - rejected_advantage))
return loss.mean()

4.2 DPO vs RLHF 对比#

维度RLHFDPO
训练流程RM 训练 → PPO 优化直接端到端优化
需要 Reward Model需要不需要
超参数多(PPO epochs, epsilon)少(主要调 beta)
显存需求高(需要同时加载多个模型)较低
训练稳定性中等(PPO 可能不稳定)较稳定
效果略优(公认)接近 RLHF

4.3 DPO 代码实现#

from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.utils.data import DataLoader
class DPOTrainer:
def __init__(self, model_name, beta=0.1):
self.model = AutoModelForCausalLM.from_pretrained(model_name)
self.ref_model = AutoModelForCausalLM.from_pretrained(model_name)
self.ref_model.eval() # 冻结参考模型
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.beta = beta
self.optimizer = torch.optim.AdamW(self.model.parameters(), lr=1e-6)
def compute_log_probs(self, model, input_ids, attention_mask, label_ids=None):
"""计算序列的平均 log probability"""
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
# 实际计算时需要根据 label_ids 偏移
logits = outputs.logits
# 计算每个 token 的 log prob
log_probs = torch.log_softmax(logits, dim=-1)
# 获取 target tokens 的 log prob
# 注意:需要处理 label shift
gathered_log_probs = log_probs[..., :-1, :].gather(-1, label_ids[..., 1:].unsqueeze(-1))
# 返回平均 log prob(排除 padding)
return gathered_log_probs.squeeze(-1).mean()
def training_step(self, batch):
# 1. 计算 policy 在 chosen/rejected 上的 log prob
policy_chosen_logps = self.compute_log_probs(
self.model,
batch["chosen_input_ids"],
batch["chosen_attention_mask"],
batch["chosen_labels"]
)
policy_rejected_logps = self.compute_log_probs(
self.model,
batch["rejected_input_ids"],
batch["rejected_attention_mask"],
batch["rejected_labels"]
)
# 2. 计算 reference model 的 log prob
with torch.no_grad():
ref_chosen_logps = self.compute_log_probs(
self.ref_model,
batch["chosen_input_ids"],
batch["chosen_attention_mask"],
batch["chosen_labels"]
)
ref_rejected_logps = self.compute_log_probs(
self.ref_model,
batch["rejected_input_ids"],
batch["rejected_attention_mask"],
batch["rejected_labels"]
)
# 3. 计算 DPO 损失
loss = dpo_loss(
policy_chosen_logps, policy_rejected_logps,
ref_chosen_logps, ref_rejected_logps,
beta=self.beta
)
# 4. 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()

五、实践注意事项#

5.1 数据质量比数量更重要#

# 偏好数据质量检查
data_quality_rules = {
"偏好一致性": "同一 prompt 的多次标注应一致",
"辨别力": "模型能轻易区分 chosen vs rejected 则数据价值低",
"多样性": "rejected 应该是合理的但次优的答案,而非明显错误",
"长度平衡": "避免长度成为偏好因素",
}
def filter_preference_data(data):
"""过滤低质量偏好数据"""
filtered = []
for item in data:
# 长度惩罚:避免模型学会"越长越好"
len_ratio = len(item["chosen"]) / max(len(item["rejected"]), 1)
if 0.5 < len_ratio < 2.0: # 长度差异过大则过滤
filtered.append(item)
return filtered

5.2 训练稳定性技巧#

# 1. Reward Model 预训练
# 先在大量偏好数据上训练 reward model
rm_pretrain_data = load_large_preference_dataset()
reward_model.pretrain(rm_pretrain_data)
# 2. 学习率调度
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=len(dataloader) * num_epochs,
eta_min=1e-7
)
# 3. 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 4. 早停
best_reward_margin = 0
patience = 3
for epoch in range(num_epochs):
val_margin = evaluate_reward_margin(model)
if val_margin > best_reward_margin + 0.01:
best_reward_margin = val_margin
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= patience:
break

5.3 对齐税问题#

# 对齐税:RLHF 后模型在某些任务上能力下降
# 解决方案:混合预训练
class AlignmentTaxMitigator:
def mix_pretraining(self, model, ratio=0.1):
"""
在 RLHF 训练数据中混入一定比例的预训练数据
ratio: 预训练数据占比(通常 5-15%)
"""
mixed_dataloader = []
for batch in rlhf_dataloader:
# 90% RLHF 数据
mixed_dataloader.append(batch)
# 10% 预训练数据
if random.random() < ratio:
pretrain_batch = sample_pretrain_batch()
mixed_dataloader.append(pretrain_batch)
return mixed_dataloader

六、总结#

方法优势劣势
RLHF效果公认最佳流程复杂,训练不稳定
DPO简洁稳定,无需 RM效果略低于 RLHF
RLAIF不依赖人类标注效果不稳定
KTO简单,效果与 DPO 相当新方法,实践较少
graph TB A["偏好对齐方法选择"] --> B{"是否有成熟 RM?"} B -->|是| C["RLHF"] B -->|否| D{"资源是否有限?"} D -->|是| E["DPO"] D -->|否| F["RLAIF / KTO"]

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

RLHF 与 DPO 偏好对齐技术
https://blog.souloss.com/posts/ai-engineering/rlhf/
作者
Souloss
发布于
2025-02-16
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时