引言
在学习强化学习的过程中,我们需要通过实践来巩固自己对于强化学习算法的理解。但是在应用阶段,采用工业界成熟的算法库往往是更为明智的选择,目前强化学习主流的算法工具由stable_baseline3,rllib等。今天我们就来针对sb3工具库,来讲解下如何快速搭建强化学习模型。
首先定义一个任务背景:
使用深度强化学习对六足机器人进行运动控制,其中模型的输入为机器人前进方向,和运动速度,18个关节角的速度以及角度;输出为18个关节的角度值。
1、环境安装
安装pytorch
pip install torch
安装强化学习库stable_baselines
pip install stable_baselines3
针对需要自己搭建环境的用户来说,gym模块也是必不可少的,因为stable_baseline中的强化学习环境就是针对gym框架进行开发的
pip install gym
2、环境搭建
基于gym的环境模型一般都可以写成这样:
# _*_coding:utf-8-*-
import sys
import gym
from sympy import *
import math
import numpy as np
gym.logger.set_level(40)
# sys.path.append('这里写其上层文件见的绝对路径,如'~/autodl-nas/robot/'')
# import Params
class RobotEnv(gym.Env):
# 初始化参数
def __init__(self):
# 状态空间为18(关节角度)+18(关节角速度)+2(控制方向)+1(控制速度)=39
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(39,))
# 动作空间为18(关节角度)
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(18,))
# 附加功能,可选,对应第35行
# self.reward_fun = Params.PARAMS_ENV['reward_fun']
# 获取原始数据,并生成状态,同时更新done
def get_state(self):
origin_data = [这里写获取数据的函数]
tmp_data = [将原始数据按照要求组成状态,注意角度\角速度\方向和速度归一化到[-1,-1],速度可以除以系数归一化到0-1]]
state = np.array(tmp_data).reshape(1,39)
self.done = True if[写判断终止条件]else False
return state
# 根据强化学习输出,发送控制指令,控制机器人运动
def move(self, action):
[这里将动作对应的控制指令发送给仿真环境中的机器人]
pass
# 奖励函数
def get_reward(self):
reward = [根据任务需求定义奖励函数,建议三个方面:1、存活时间长短(即能否满足站立并运动的要求)2、方向是否为给定方向3、速度是否为给定速度]
# reward = self.reward_fun#如果使用这个,就不需要上面个这句,上面这句就可以放到参数文件中进行定义
return reward
# 主程序
def step(self, action):
# 执行动作
self.move(action)
# 获取动作对应奖励
reward = self.get_reward()
# 获取下一状态
state = self.get_state()
# 返回
return state, reward, self.done, {}
# 重置环境
def reset(self):
[初始化机器人]
# 获得对应状态
state = self.get_state()
return state
# 关闭机器人
def close(self):
[关闭机器人]
在状态设置方面,由于都是连续动作和状态,因此使用gym.spaces.box进行设置。在状态设置上,我倾向于将最终输送给网络的状态使用一个单独的函数进行整合,同时获取奖励的函数也是如此,如果你需要与仿真环境交互,还可以设置一个move函数。
3、模型训练
在使用sb3尽心模型训练的时候,只需要根据参数要求,对参数进行设置即可:
import numpy as np
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
import random
import argparse
import GymEnv
import os
import Params
def fixed_seed(i):
random.seed(i)
os.environ['PYTHONHASHSEED'] = str(i) # 为了禁止hash随机化,使得实验可复现
np.random.seed(i)
torch.manual_seed(i)
torch.cuda.manual_seed(i)
torch.cuda.manual_seed_all(i) # if you are using multi-GPU.
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True
if __name__=='__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--name", required=True, help="Name of the this train")
args = parser.parse_args()
fixed_seed(0)#固定随机种子
params = Params.PARAMS_PPO
env = make_vec_env("robot-v0",seed = 0,n_envs=params['n_envs'])#建立环境
policy_kwargs =(#网络设置
dict(activation_fn=params['active'],
net_arch=[dict(pi=params['net_pi'],
vf=params['net_vf'])]))
model = PPO(
"MlpPolicy",#mpl神经网络
env,
gamma=params['gamma'],
learning_rate=params['learning_rate'],
n_steps=params['n_steps'],
batch_size=params['batch_size'],
n_epochs=params['n_epochs'],
policy_kwargs=policy_kwargs,
verbose=1,#需要打印训练信息
tensorboard_log="./tf-logs/")#tensorboard文件存放地址
model.learn(
total_timesteps=params['total_timesteps'],
n_eval_episodes=params['n_eval_episodes'])
model.save('%s.pkl'%args.name)
在这里,我同样推荐将参数使用另外的文件保存的方式,这样也有利于修改,同时可以结合之前博客中写的函数指针进行设置函数指针应用。在代码中,我还设置了随机种子,方便每次训练时可以复现,同时更好得比较每次参数修改的效果。在这里我们使用了代码中方式定义网络,如果需要自定义网络,比如设置不同的层和激活函数,可以使用自定义策略函数。这里的类其实是BaseFeaturesExtractor的子类,例如我的代码中:
class CustomMLP(BaseFeaturesExtractor):
"""
:param observation_space: (gym.Space)
:param features_dim: (int) Number of features extracted.
This corresponds to the number of unit for the last layer.
"""
def __init__(self, observation_space: gym.spaces.Box,features_dim:int):
super(CustomMLP, self).__init__(observation_space, features_dim)
n_input = observation_space.shape[0]
self.net = th.nn.Sequential(
th.nn.Linear(n_input, 17),
th.nn.LeakyReLU(),
th.nn.Linear(17,35),
th.nn.LeakyReLU(),
)
def forward(self, observations: th.Tensor) -> th.Tensor:
return self.net(observations)
可以使用CustomMLP来对网络部分进行自定义。
4、测试
这里搭建了一个简单的环境,主要为了测试模板是否可用,其中设置如下:
env:
# _*_coding:utf-8-*-
import sys
import gym
from sympy import *
import math
import numpy as np
gym.logger.set_level(40)
# sys.path.append('~/autodl-nas/robot/')
# import Params
class RobotEnv(gym.Env):
# 初始化参数
def __init__(self):
# 状态空间为18(关节角度)+18(关节角速度)+2(控制方向)+1(控制速度)=39
self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(39,))
# 动作空间为18(关节角度)
self.action_space = gym.spaces.Box(low=-1, high=1, shape=(18,))
self.t = 1
# 获取原始数据,并生成状态,同时更新done
def get_state(self):
state = np.array(self.observation_space.sample()).reshape(1,39)
self.t = self.t+1
self.done = True if self.t==50 else False
return state
# 奖励函数
def get_reward(self):
if self.t == 10:
reward = 1
else:
reward = 0
return reward
# 主程序
def step(self, action):
reward = self.get_reward()
state = self.get_state()
# print(self.t,self.done,reward)
return state, reward, self.done, {}
# 重置环境
def reset(self):
self.t = 1
state = self.get_state()
return state
# 关闭机器人
def close(self):
pass
if __name__=='__main__':
U=RobotEnv()
for i in range(2):
U.reset()
while not U.done:
U.step(0)
训练后得到如下结果:
评论(0)
您还未登录,请登录后发表或查看评论