引言

在学习强化学习的过程中,我们需要通过实践来巩固自己对于强化学习算法的理解。但是在应用阶段,采用工业界成熟的算法库往往是更为明智的选择,目前强化学习主流的算法工具由stable_baseline3,rllib等。今天我们就来针对sb3工具库,来讲解下如何快速搭建强化学习模型。

首先定义一个任务背景:

使用深度强化学习对六足机器人进行运动控制,其中模型的输入为机器人前进方向,和运动速度,18个关节角的速度以及角度;输出为18个关节的角度值。

1、环境安装

安装pytorch

pip install torch

安装强化学习库stable_baselines

pip install stable_baselines3

针对需要自己搭建环境的用户来说,gym模块也是必不可少的,因为stable_baseline中的强化学习环境就是针对gym框架进行开发的

pip install gym

2、环境搭建

基于gym的环境模型一般都可以写成这样:

# _*_coding:utf-8-*-
import sys
import gym
from sympy import *
import math
import numpy as np
gym.logger.set_level(40)
# sys.path.append('这里写其上层文件见的绝对路径,如'~/autodl-nas/robot/'')
# import Params
class RobotEnv(gym.Env):
    # 初始化参数
    def __init__(self):
        # 状态空间为18(关节角度)+18(关节角速度)+2(控制方向)+1(控制速度)=39
        self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(39,))
        # 动作空间为18(关节角度)
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(18,))
        # 附加功能,可选,对应第35行
        # self.reward_fun = Params.PARAMS_ENV['reward_fun']

    # 获取原始数据,并生成状态,同时更新done
    def get_state(self):
        origin_data = [这里写获取数据的函数]
        tmp_data = [将原始数据按照要求组成状态,注意角度\角速度\方向和速度归一化到[-1,-1],速度可以除以系数归一化到0-1]]
        state = np.array(tmp_data).reshape(1,39)
        self.done = True if[写判断终止条件]else False
        return state

    # 根据强化学习输出,发送控制指令,控制机器人运动
    def move(self, action):
        [这里将动作对应的控制指令发送给仿真环境中的机器人]
        pass

    # 奖励函数
    def get_reward(self):
        reward = [根据任务需求定义奖励函数,建议三个方面:1、存活时间长短(即能否满足站立并运动的要求)2、方向是否为给定方向3、速度是否为给定速度]
        # reward = self.reward_fun#如果使用这个,就不需要上面个这句,上面这句就可以放到参数文件中进行定义
        return reward

    # 主程序
    def step(self, action):
        # 执行动作
        self.move(action)
        # 获取动作对应奖励
        reward = self.get_reward()
        # 获取下一状态
        state = self.get_state()
        # 返回
        return state, reward, self.done, {}

    # 重置环境
    def reset(self):
        [初始化机器人]
        # 获得对应状态
        state = self.get_state()
        return state

    # 关闭机器人
    def close(self):
        [关闭机器人]

在状态设置方面,由于都是连续动作和状态,因此使用gym.spaces.box进行设置。在状态设置上,我倾向于将最终输送给网络的状态使用一个单独的函数进行整合,同时获取奖励的函数也是如此,如果你需要与仿真环境交互,还可以设置一个move函数。

3、模型训练

在使用sb3尽心模型训练的时候,只需要根据参数要求,对参数进行设置即可:

import numpy as np
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import random
import argparse
import GymEnv
import os
import Params
def fixed_seed(i):
     random.seed(i)
     os.environ['PYTHONHASHSEED'] = str(i)  # 为了禁止hash随机化,使得实验可复现
     np.random.seed(i)
     torch.manual_seed(i)
     torch.cuda.manual_seed(i)
     torch.cuda.manual_seed_all(i)  # if you are using multi-GPU.
     torch.backends.cudnn.benchmark = False
     torch.backends.cudnn.deterministic = True


if __name__=='__main__':
     parser = argparse.ArgumentParser()
     parser.add_argument("--name", required=True, help="Name of the this train")
     args = parser.parse_args()
     fixed_seed(0)#固定随机种子
     params = Params.PARAMS_PPO
     env = make_vec_env("robot-v0",seed = 0,n_envs=params['n_envs'])#建立环境
     policy_kwargs =(#网络设置
     dict(activation_fn=params['active'],
          net_arch=[dict(pi=params['net_pi'],
          vf=params['net_vf'])]))
     model = PPO(
          "MlpPolicy",#mpl神经网络
          env,
          gamma=params['gamma'],
          learning_rate=params['learning_rate'],
          n_steps=params['n_steps'],
          batch_size=params['batch_size'],
          n_epochs=params['n_epochs'],
          policy_kwargs=policy_kwargs,
          verbose=1,#需要打印训练信息
          tensorboard_log="./tf-logs/")#tensorboard文件存放地址
     model.learn(
          total_timesteps=params['total_timesteps'],
          n_eval_episodes=params['n_eval_episodes'])
     model.save('%s.pkl'%args.name)

在这里,我同样推荐将参数使用另外的文件保存的方式,这样也有利于修改,同时可以结合之前博客中写的函数指针进行设置函数指针应用。在代码中,我还设置了随机种子,方便每次训练时可以复现,同时更好得比较每次参数修改的效果。在这里我们使用了代码中方式定义网络,如果需要自定义网络,比如设置不同的层和激活函数,可以使用自定义策略函数。这里的类其实是BaseFeaturesExtractor的子类,例如我的代码中:

class CustomMLP(BaseFeaturesExtractor):
    """
    :param observation_space: (gym.Space)
    :param features_dim: (int) Number of features extracted.
        This corresponds to the number of unit for the last layer.
    """
    def __init__(self, observation_space: gym.spaces.Box,features_dim:int):
        super(CustomMLP, self).__init__(observation_space, features_dim)
        n_input = observation_space.shape[0]
        self.net = th.nn.Sequential(
            th.nn.Linear(n_input, 17),
            th.nn.LeakyReLU(),
            th.nn.Linear(17,35),
            th.nn.LeakyReLU(),
        )
    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.net(observations)

可以使用CustomMLP来对网络部分进行自定义。

4、测试

这里搭建了一个简单的环境,主要为了测试模板是否可用,其中设置如下:

env:

# _*_coding:utf-8-*-
import sys
import gym
from sympy import *
import math
import numpy as np
gym.logger.set_level(40)
# sys.path.append('~/autodl-nas/robot/')
# import Params
class RobotEnv(gym.Env):
    # 初始化参数
    def __init__(self):
        # 状态空间为18(关节角度)+18(关节角速度)+2(控制方向)+1(控制速度)=39
        self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(39,))
        # 动作空间为18(关节角度)
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(18,))
        self.t = 1

    # 获取原始数据,并生成状态,同时更新done
    def get_state(self):
        state = np.array(self.observation_space.sample()).reshape(1,39)
        self.t = self.t+1
        self.done = True if self.t==50 else False
        return state

    # 奖励函数
    def get_reward(self):
        if self.t == 10:
            reward =  1
        else:
            reward = 0
        return reward

    # 主程序
    def step(self, action):
        reward = self.get_reward()
        state = self.get_state()
        # print(self.t,self.done,reward)
        return state, reward, self.done, {}

    # 重置环境
    def reset(self):
        self.t = 1
        state = self.get_state()
        return state

    # 关闭机器人
    def close(self):
        pass
if __name__=='__main__':
    U=RobotEnv()
    for i in range(2):
        U.reset()
        while not U.done:
            U.step(0)

训练后得到如下结果:

May the force be with you!