0. 简介

随着chatgpt的爆火,最近也有很多大模型在不断地出现,比如说Bloom系列以及以LLAMA为基础的ziya和baichuan。这些模型相较于chatglm来说,更加具有发展前景,因为其是完全可商用,并可以不断迭代更新的。最近作者在跟着hiyouga大佬的LLaMA-Efficient-Tuning进行学习,相较于其他的项目来说,该项目是非常适合跟着学习并入门的。

1. RHLF

RHLF主要分成了两个部分,其一是奖励模型训练,其二就是强化学习的步骤

1.1 奖励模型训练 (Rewad Model, RM)

从去掉最后的取消嵌入层的SFT模型开始,训练了一个模型,输入一个提示和回答,并输出一个奖励的数值,这就是RM的输入输出。GPT3.5论文中表示只使用6B RM。因为这可以节省大量计算量,并且他们发现175B RM的训练可能不稳定,所以不适合作为RM的模型,这也是合情合理的,因为本身判断输出的好坏并不需要庞大的模型,反而是小模型就可以达到预期的结果。

1.2 强化学习训练

强化学习和预训练模型是最近两年最为火热的AI方向之二,之前不少科研工作者说强化学习并不是一个非常适合应用到预训练模型中,因为很难通过模型的输出内容建立奖励机制。而InstructGPT/ChatGPT反直觉的做到了这点,它通过结合人工标注,将强化学习引入到预训练语言模型是这个算法最大的创新点。

2. 代码阅读–train_rm.py&pairwise.py

在reward部分,与之前讲的PT和SFT相比唯一不同的就是使用了PairwisePeftTrainer这个函数

    trainer = PairwisePeftTrainer(
        finetuning_args=finetuning_args,
        model=model,
        args=training_args,
        tokenizer=tokenizer,
        data_collator=data_collator,
        callbacks=[LogCallback()],
        compute_metrics=compute_accuracy,
        **trainer_kwargs
    )

PairwisePeftTrainer的类,它继承自PeftTrainer类。该类用于计算成对损失。

class PairwisePeftTrainer(PeftTrainer):
    r"""
    Inherits PeftTrainer to compute pairwise loss.
    """

    def __init__(self, *args, **kwargs):#类的构造函数。它接受任意数量的位置参数和关键字参数
        super().__init__(*args, **kwargs)#super()函数获取父类,并调用其构造函数
        self.can_return_loss = True # override property to return eval_loss

    def compute_loss(self, model, inputs, return_outputs=False):
        r"""
        Computes pairwise loss. The first n examples are chosen and the last n examples are rejected.

        We use score on the EOS token to represent reward of the whole sentence.

        Subclass and override to inject custom behavior. It should not be directly used by external scripts.
        """
        batch_size = inputs["input_ids"].size(0) // 批次大小
        _, _, values = model(**inputs)#使用模型对输入数据进行前向传播,获取模型的输出
        r_accept, r_reject = values[:, -1].split(batch_size, dim=0)#将模型输出的值分割为两部分,r_accept和r_reject。分割的维度为0,即按行分割。
        loss = -torch.log(torch.sigmoid(r_accept - r_reject)).mean()#计算损失,使用torch.log和torch.sigmoid函数计算r_accept和r_reject的差值的sigmoid函数值的负对数的均值
        return (loss, torch.stack((r_accept, r_reject), dim=-1)) if return_outputs else loss#如果return_outputs为True,则返回损失和r_accept、r_reject的堆叠。否则,只返回损失。

3. 代码阅读—train_ppo.py

这段代码主要是用来进行PPO(Proximal Policy Optimization)训练的。首先,代码中的prepare_args()函数用来准备预训练模型和数据集的参数。然后,prepare_data()函数用这些参数来准备数据集。接着,load_pretrained()函数加载预训练模型和分词器。之后,preprocess_data()函数对数据集进行预处理,包括将输入和输出分开。这部分基本内容保持一致

    # 准备预训练模型和数据集
    model_args, data_args, training_args, finetuning_args = prepare_args(stage="ppo")
    dataset = prepare_data(model_args, data_args)
    model, tokenizer = load_pretrained(model_args, finetuning_args, training_args.do_train, stage="ppo")
    dataset = preprocess_data(dataset, tokenizer, data_args, training_args, stage="ppo")
    data_collator = DynamicDataCollatorWithPadding(tokenizer)#处理数据集,将数据集中的输入和输出分开

接下来,代码中定义了一个PPOConfig对象,用来设置PPO算法的参数,包括模型名字、学习率、批大小、梯度累积步数等等。

    ppo_config = PPOConfig(
        model_name=model_args.model_name_or_path,
        learning_rate=training_args.learning_rate,
        mini_batch_size=training_args.per_device_train_batch_size,
        batch_size=training_args.per_device_train_batch_size,
        gradient_accumulation_steps=training_args.gradient_accumulation_steps,
        ppo_epochs=1,
        max_grad_norm=training_args.max_grad_norm
    )#ppo的参数

然后,代码创建了一个AdamW优化器,用来优化模型的参数。同时,通过调用get_scheduler()函数创建了一个学习率调度器,用来控制优化器的学习率。

optimizer = AdamW(filter(lambda p: p.requires_grad, model.parameters()), lr=ppo_config.learning_rate)
    total_train_batch_size = \
        training_args.per_device_train_batch_size * training_args.gradient_accumulation_steps * training_args.world_size
    lr_scheduler = get_scheduler(
        training_args.lr_scheduler_type,
        optimizer=optimizer,
        num_warmup_steps=training_args.warmup_steps,
        num_training_steps=(training_args.num_train_epochs * math.ceil(len(dataset) / total_train_batch_size))
    )#学习率调度器

接下来,代码初始化了一个PPOPeftTrainer对象,用来进行PPO训练。这个对象接收了训练参数、微调参数、回调函数、PPO配置、模型、分词器、数据集、数据处理器、优化器和学习率调度器等等。(这部分我们下一节详细来看)

    # Initialize our Trainer
    ppo_trainer = PPOPeftTrainer(
        training_args=training_args,
        finetuning_args=finetuning_args,
        callbacks=[LogCallback()],
        config=ppo_config,
        model=model,
        ref_model=None,
        tokenizer=tokenizer,
        dataset=dataset,
        data_collator=data_collator,
        optimizer=optimizer,
        lr_scheduler=lr_scheduler
    )

然后,代码调用ppo_train()函数来进行PPO训练,并调用save_model()函数和save_state()函数来保存训练好的模型和状态。最后,如果是主进程且设置了plot_loss为True,代码会调用plot_loss()函数来绘制损失函数和奖励函数的曲线图。

    ppo_trainer.ppo_train(max_target_length=data_args.max_target_length)#ppo训练
    ppo_trainer.save_model()#保存模型
    ppo_trainer.save_state() # 必须要在save_model之后调用,否则会报错
    if ppo_trainer.is_world_process_zero() and model_args.plot_loss:
        plot_loss(training_args.output_dir, keys=["loss", "reward"])

4. 代码阅读—ppo.py

这个文档中的代码是一个用于训练的PPO Trainer类,包含了替换模型的valuehead、转换模型的层归一化参数数据类型、PPO阶段的训练循环、生成模型响应和保存模型检查点等功能

首先replace_model用于替换模型的valuehead。根据目标值(”default”或”reward”),保存原始的valuehead权重和偏置,并将模型的valuehead替换为指定的适配器。

'''
替换模型的valuehead
'''
def replace_model(model: AutoModelForCausalLMWithValueHead, target: Literal["default", "reward"]) -> None:
    if target == "reward": # save original head temporarily # 如果目标值为"reward",执行下面的代码块。
        valuehead_state_dict = model.v_head.state_dict()#获取模型的v_head属性(valuehead)的权重,并保存在变量valuehead_state_dict中

        setattr(model, "origin_head_weight", valuehead_state_dict["summary.weight"])#保存权重
        setattr(model, "origin_head_bias", valuehead_state_dict["summary.bias"])#保存偏置

    model.pretrained_model.set_adapter(target) # 将LoRA适配器设置为活动状态
    model.v_head.load_state_dict({
        "summary.weight": getattr(model, "{}_head_weight".format(target)),
        "summary.bias": getattr(model, "{}_head_bias".format(target))
    })#将指定适配器的权重和偏置加载到模型的v_head属性(valuehead)中

cast_layernorm_dtype函数用于将模型的层归一化参数转换为指定的数据类型(半精度或全精度)。根据给定的层归一化参数名称列表和参数值,将模型的层归一化参数转换为指定的数据类型。

'''
将模型的valuehead恢复为原始的valuehead
'''
def cast_layernorm_dtype(
        model: AutoModelForCausalLMWithValueHead,
        layer_norm_names: List[str] = ["norm", "ln_f"], # 用于LLaMA和BLOOM设置
        layer_norm_params: Optional[Dict[str, torch.Tensor]] = None
) -> Tuple[AutoModelForCausalLMWithValueHead, Dict[str, torch.Tensor]]:

    layer_norm_state_dict = {}#存储float32类型变量的权重

    for name, param in model.named_parameters():#遍历模型的命名参数,找到属于valuehead的层归一化参数
        if param.ndim == 1 and any(layer_norm_name in name for layer_norm_name in layer_norm_names):
            if layer_norm_params is not None:
                param.data = layer_norm_params[name] # 如果提供了layer_norm_params参数,说明需要将参数恢复为float32类型的权重
            else:
                layer_norm_state_dict[name] = param.data.detach().clone()
                param.data = param.data.to(torch.float16)#如果没有提供layer_norm_params参数,说明需要将参数转换为float16类型的权重

    return model, layer_norm_state_dict

这个文件中主要的核心内容就是PPOPeftTrainer(PPOTrainer, PeftTrainer)这个类,它继承自PPOTrainer和PeftTrainer类,用于实现PPO阶段的训练循环。包括初始化方法、ppo_train方法、generate方法和save_model方法。

class PPOPeftTrainer(PPOTrainer, PeftTrainer):
    r"""
    继承了PPOTrainer。
    """

    def __init__(
            self,
            training_args: Seq2SeqTrainingArguments,
            finetuning_args: FinetuningArguments,
            callbacks: List[LogCallback],
            **kwargs
    ):
        PPOTrainer.__init__(self, **kwargs)
        self.args = training_args
        self.finetuning_args = finetuning_args
        self.log_callback = callbacks[0]
        self.state = TrainerState()
        self.data_collator = self.accelerator.prepare(kwargs["data_collator"]) # override the data collator of PPOTrainer

ppo_train函数实现了PPO阶段的训练循环。根据配置参数,计算训练批次大小、每个epoch的步数、数据集大小和最大步数。然后根据数据迭代器获取批次数据,并进行模型训练和更新。在每个epoch结束时,打印训练日志并保存模型检查点。这里面是会调用generate函数来完成回复信息,然后再使用训练好的reward来评估整个模型

    def ppo_train(self, max_target_length: int) -> None:
        r"""
        实现PPO阶段的训练循环,如Huggingface的训练器中的_inner_training_loop()。
        """
        total_train_batch_size = self.config.batch_size * self.config.gradient_accumulation_steps * self.args.world_size# 计算总的训练批次大小
        len_dataloader = len(self.dataloader)#获取数据加载器的长度,即数据集的批次数
        num_steps_per_epoch = max(len_dataloader // self.config.gradient_accumulation_steps, 1)# 计算每个epoch的步数
        num_examples = len(self.dataset)# 计算数据集的大小
        num_train_epochs = self.args.num_train_epochs# 训练的epoch数
        max_steps = math.ceil(num_train_epochs * num_steps_per_epoch)# 计算最大步数

        self.state.max_steps = max_steps
        self.state.num_train_epochs = num_train_epochs#将训练的epoch数保存到训练状态中
        self.state.is_local_process_zero = self.is_local_process_zero()#是否将本地进程的主进程的布尔值保存到训练状态中
        self.state.is_world_process_zero = self.is_world_process_zero()#是否将全局进程的主进程的布尔值保存到训练状态中

        if self.is_world_process_zero():# 如果是主进程
            logger.info("***** Running training *****")
            logger.info(f"  Num examples = {num_examples}")
            logger.info(f"  Num Epochs = {num_train_epochs}")
            logger.info(f"  Instantaneous batch size per device = {self.config.batch_size}")
            logger.info(f"  Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size}")
            logger.info(f"  Gradient Accumulation steps = {self.config.gradient_accumulation_steps}")
            logger.info(f"  Total optimization steps = {max_steps}")
            logger.info(f"  Number of trainable parameters = {sum(p.numel() for p in self.model.parameters() if p.requires_grad)}")

        # model.generate 的关键字参数
        gen_kwargs = {
            "top_k": 0.0,
            "top_p": 1.0,
            "do_sample": True,
            "pad_token_id": self.tokenizer.pad_token_id,
            "eos_token_id": self.tokenizer.eos_token_id,
            "logits_processor": get_logits_processor()
        }
        output_length_sampler = LengthSampler(max_target_length // 2, max_target_length)# 用于生成长度的采样器
        unwrapped_model: PreTrainedModel = self.accelerator.unwrap_model(self.model)# 获取解包后的模型


        dataiter = iter(self.dataloader)# 获取数据迭代器,获取解包后的模型
        steps_trained = 0
        loss_meter = AverageMeter()# 用于计算平均损失
        reward_meter = AverageMeter()# 用于计算平均奖励

        for step in tqdm(range(max_steps), disable=not self.is_world_process_zero()):#进行训练循环,循环次数为max_steps

            for _ in range(self.config.gradient_accumulation_steps):#梯度累积操作

                batch = next(dataiter)#获取一个批次的训练数据
                steps_trained += 1#将已训练的步数加一

                unwrapped_model.gradient_checkpointing_disable()# 禁用梯度检查点,以减少内存消耗和加速训练
                unwrapped_model.config.use_cache = True# 使用缓存,以减少内存消耗和加速训练

                # 从模型中获取查询和生成的回复
                query_tensors: torch.Tensor = batch["input_ids"]#从批次数据中获取查询的张量
                response_tensors = self.generate(batch, length_sampler=output_length_sampler, return_prompt=False, **gen_kwargs)# 过调用self.generate方法生成回复的张量。

                #分别创建空列表用于存储查询和回复的张量
                queries: List[torch.Tensor] = []
                responses: List[torch.Tensor] = []
                for i in range(len(query_tensors)):
                    query_length = (query_tensors[i] != self.tokenizer.pad_token_id).nonzero()[0]# 获取查询的长度,即非填充标记的第一个索引位置
                    response_length = (response_tensors[i] != self.tokenizer.pad_token_id).nonzero()[-1] + 1# 获取回复的长度,即非填充标记的最后一个索引位置加一
                    queries.append(query_tensors[i, query_length:]) # 将去除左边填充的查询张量添加到queries列表中
                    if response_length < 2:  # 如果回复的长度小于2,即只有一个标记
                        responses.append(response_tensors.new_empty(2).fill_(self.tokenizer.eos_token_id))#创建一个长度为2的新张量,并用结束标记填充,然后将其添加到responses列表中。
                    else:
                        responses.append(response_tensors[i, :response_length]) # 将去除右边填充的回复张量添加到responses列表中

                # 计算奖励
                replace_model(unwrapped_model, target="reward")# 替换模型的valuehead为奖励适配器
                _, _, values = self.model(**self.prepare_model_inputs(queries, responses))# 使用模型计算查询和回复的奖励
                rewards = [reward for reward in values[:, -1].to(torch.float32)] # 将奖励转换为float32类型,并存储在rewards列表中
                replace_model(unwrapped_model, target="default") # 将模型的valuehead恢复为原始的valuehead

                # 跑PPO模型
                unwrapped_model.gradient_checkpointing_enable()#启用梯度检查点
                unwrapped_model.config.use_cache = False#用缓存

                stats = self.step(queries, responses, rewards)#根据奖励,模型输出状态

                loss_meter.update(stats["ppo/loss/total"], n=len(rewards))#根据奖励值和模型的查询和回复进行一步PPO模型的训练,并获取训练统计信息
                reward_meter.update(torch.stack(rewards).mean().item(), n=len(rewards))# 将每个批次的损失添加到损失计量器中

                if steps_trained == len_dataloader:
                    dataiter = iter(self.dataloader)# 重置数据迭代器
                    steps_trained = 0

            if self.is_world_process_zero() and (step+1) % self.args.logging_steps == 0:# 打印日志
                logs = {
                    "loss": round(loss_meter.avg, 4),
                    "reward": round(reward_meter.avg, 4),
                    "learning_rate": stats["ppo/learning_rate"],
                    "epoch": round(step / num_steps_per_epoch, 2)
                }
                print(logs)
                logs["step"] = step
                self.state.log_history.append(logs)
                self.log_callback.on_log(self.args, self.state, None)
                loss_meter.reset()
                reward_meter.reset()

            if (step+1) % self.args.save_steps == 0: # save checkpoint
                self.save_model(os.path.join(self.args.output_dir, f"checkpoint-{step+1}"))

generate函数则是生成给定查询的模型响应。首先将模型转换为半精度,并根据生成配置参数生成模型的响应。然后将模型转换回全精度,并返回生成的响应。

 @torch.no_grad()
    def generate(
            self,
            inputs: Dict[str, torch.Tensor],
            length_sampler: Optional[Callable] = None,
            return_prompt: Optional[bool] = True,
            **generation_kwargs,
    ) -> torch.Tensor:
        r"""
        生成给定查询的模型响应。
        子类化并覆盖以注入自定义行为。
        """
        self.model, layer_norm_params = cast_layernorm_dtype(self.model)# 将模型转换为半精度

        if length_sampler is not None:
            generation_kwargs["max_new_tokens"] = length_sampler()# 生成最大长度

        unwrapped_model = self.accelerator.unwrap_model(self.model)# 获取模型

        response = unwrapped_model.generate(**inputs, **generation_kwargs)# 生成回复

        # 临时hack,以确保生成配置不会在评估循环的每个迭代中初始化
        #灵感来源:https://github.com/huggingface/transformers/blob/v4.28.1/src/transformers/trainer_seq2seq.py#L273
        if unwrapped_model.pretrained_model.generation_config._from_model_config:
            unwrapped_model.pretrained_model.generation_config._from_model_config = False# 重置生成配置

        self.model, _ = cast_layernorm_dtype(self.model, layer_norm_params)# 将模型转换为全精度

        if not return_prompt and not self.is_encoder_decoder:# 如果不返回提示并且不是编码器解码器
            return response[:, inputs["input_ids"].size(1):]# 返回回复
        return response

最后一个save_model函数则是保存模型检查点。根据配置参数,保存模型的检查点文件。

    def save_model(self, output_dir: Optional[str] = None) -> None:
        r"""
        保存模型检查点。
        子类化并覆盖以注入自定义行为。
        """
        if self.args.should_save:
            self._save(output_dir)