基于gym环境搭建机械臂强化学习训练环境

在本系列二中,完成了机械臂的正逆运动学搭建过程,也就是可以指定机械臂的末端到达某一个点了 Losgy浩:基于Pybullet搭建强化学习机械臂(二)

下面的内容将阐述如何利用gym模块来搭建强化学习的训练环境。本文的代码参考的是mujoco中的fetch机器人环境。

本文完整项目地址https://github.com/PiggyCh/RL_arm_under_sparse_reward

首先,我们需要写一个类,来满足agent的各种步骤调用过程。这个类是基于gym的,因此我们直接继承gym.env,并在此基础上对函数进行重写

 class bmirobotGymEnv(gym.Env):

重温一下强化学习的基本MDP框架:

所以在API方面我们需要重写的函数如下:

reward: 计算奖励函数

get_state: 得到返回的状态向量

action: 实施动作的函数

done:判断是否操作成功的函数

reset: 重置环境的函数

下面我们分别实现:

1. Init对类进行初始化

 def __init__(
         self, model_path, n_substeps, gripper_extra_height, block_gripper,
         has_object, target_in_the_air, target_offset, obj_range, target_range,
         distance_threshold, initial_qpos, reward_type,
     ):
         """Initializes a new Fetch environment.
         Args:
             model_path (string): XML文件的路径,这里可以写URDF,在bmirobot里用的是Pybullet环境
             n_substeps (int): 目前推测n-substep是 每次step用的步数。比如一个动作发出后,后续25个时间步骤就继续执行动作
             gripper_extra_height (float): 当定位夹持器时,额外的高度高于桌子
             block_gripper (boolean): 抓手是否被阻塞(即不能移动)
             has_object (boolean):环境中是否有对象
             target_in_the_air (boolean):目标物是否应该在桌子上方的空中或桌面上
             target_offset (float or array with 3 elements): 目标偏移量
             obj_range (float): 初始目标位置采样的均匀分布范围
             target_range (float):采样目标的均匀分布范围
             distance_threshold (float): 目标达到之后的临界值
             initial_qpos (dict):定义初始配置的联合名称和值的字典
             reward_type ('sparse' or 'dense'):奖励类型,如稀疏或密集
         """
         IS_USEGUI = Args().Use_GUI
         self.gripper_extra_height = gripper_extra_height
         self.block_gripper = block_gripper
         self.has_object = has_object
         self.target_in_the_air = target_in_the_air
         self.target_offset = target_offset
         self.obj_range = obj_range
         self.target_range = target_range
         self.distance_threshold = distance_threshold
         self.reward_type = reward_type
         self.model_path=model_path
         self.n_substeps=n_substeps
         self.n_actions=4
         self.blockUid = -1
         self.initial_qpos=initial_qpos
         self._urdfRoot = pybullet_data.getDataPath()
         self.seed()
         #是否进行渲染,GUI是图形界面,direct是不渲染
         if IS_USEGUI:
             self.physics = p.connect(p.GUI)
         else:
             self.physics = p.connect(p.DIRECT)
         #加载机器人模型
         self._bmirobot = bmirobotv0()
         self._timeStep= 1. / 240.
         action_dim = 4
         self._action_bound = 0.5
         # 这里的action和obs space 的low and high 可能需要再次考虑
         action_high = np.array([self._action_bound] * action_dim)
         self.action_space = spaces.Box(-action_high, action_high)
         #重置环境
         self.reset()
         # GoalEnv methods

在init中我们设置了一些基本的参数方便我们在后续设置函数中使用

2. reward函数:

奖励的设置在强化学习中,一般设置的方式有 密集奖励 和 稀疏奖励。

所谓密集奖励,指的是为了完成任务,我们在空间中设置丰富的奖励来引导智能体一步一步完成任务。设置密集奖励存在一些问题,例如,针对特定的任务我们需要设定不同特定的奖励,而且简单的线性变换会使得最终agent训练的结果有很大的不同,我们需要不断地调整才能让训练有较好的收敛效果,等等一系列问题既不优雅,也不好处理。

另一种奖励方式是稀疏奖励,即空间中的奖励很少,智能体缺乏奖励引导,例如,只有当agent完成任务,我们才给他一个reward signal,不然每个操作的奖励是-1的惩罚,这样的设置相比密集奖励的好处,当然是非常优雅的,但是,稀疏奖励的问题往往对于agent学习而言也是一个挑战,文本设置了稀疏奖励的方式,并且为了解决稀疏奖励的学习难度,添加了几个技巧,这在后续训练的过程中,会继续介绍。

首先我们先写好奖励的函数

 def compute_reward(self, achieved_goal, goal, info):
         # Compute distance between goal and the achieved goal.
         d = goal_distance(achieved_goal, goal)
         if self.reward_type == 'sparse':
             return -(d > self.distance_threshold).astype(np.float32)
         else:
             return -d
          
 def goal_distance(goal_a, goal_b):
     assert goal_a.shape == goal_b.shape
     # np.linalg.norm指求范数,默认是l2范数
     return np.linalg.norm(goal_a - goal_b, axis=-1)

可以看到这里我们设置了一个阈值distance_threshold ,当前方块的位置与目标位置的距离在阈值内时,我们就判定成功。

如果奖励是非稀疏的,则是将二者的距离d作为了奖励信号。

3. 获取状态函数

在强化学习中,我们是根据环境所给我们的状态来确定采取什么动作的,也就是对于强化学习算法而言,状态是作为一个输入向量进行计算的,这里我们是利用pybullet提供的一些api来计算相关的参数。

 def _get_obs(self):
         # 关于机械臂的状态观察,可以从以下几个维度进行考虑
         # 末端位置、夹持器状态位置、物体位置、物体姿态、  物体相对末端位置、物体线速度、物体角速度、末端速度、物体相对末端线速度
         # 末端位置 3vec 及速度
         end_pos = np.array(self._bmirobot.getObservation())
         end_pos = end_pos[:3]
         # 夹持器位置、姿姿 vec3 *2  ,可能需要重新考虑一下末端位置和grip的关系
         gripperState = p.getLinkState(self._bmirobot.bmirobotid, self._bmirobot.bmirobot_righthand,
                                       computeLinkVelocity=1)
         gripperPos = np.array(gripperState[4])
         gripperOrn_temp = np.array(gripperState[5])
         gripper_linear_Velocity = np.array(gripperState[6])
         gripper_angular_Velocity = np.array(gripperState[7])
         # 把四元数转换成欧拉角,使数据都是三维的
         gripperOrn = p.getEulerFromQuaternion(gripperOrn_temp)
         gripperOrn = np.array(gripperOrn)
         # 物体位置、姿态
         blockPos, blockOrn_temp = p.getBasePositionAndOrientation(self.blockUid)
         blockPos = np.array(blockPos)
         blockOrn = p.getEulerFromQuaternion(gripperOrn_temp)
         blockOrn = np.array(blockOrn)
         # 物体相对位置 vec *3
         relative_pos = blockPos - gripperPos
         # 物体的线速度和角速度
         block_Velocity = p.getBaseVelocity(self.blockUid)
         block_linear_velocity = np.array(block_Velocity[0])
         target_pos = np.array(p.getBasePositionAndOrientation(self.targetUid)[0])
         block_angular_velocity = np.array(block_Velocity[1])
 ​
         obs = [
             end_pos.flatten(),
             gripperOrn.flatten(),
             gripper_linear_Velocity.flatten(),
             gripper_angular_Velocity.flatten(),
             blockPos.flatten(),
             blockOrn.flatten(),
             relative_pos.flatten(),
             block_linear_velocity.flatten(),
             block_angular_velocity.flatten(),
         ]
         if not self.has_object:
             achieved_goal = end_pos.copy()
         else:
             achieved_goal = blockPos.copy()
         for i in range(1, len(obs)):
             end_pos = np.append(end_pos, obs[i])
         obs = end_pos.reshape(-1)
         self._observation = obs
         return {
             'observation': obs.copy(),
             'achieved_goal': achieved_goal.copy(),
             'desired_goal': target_pos.flatten(),
         }

我们获取到的环境信息有30维,在注释中已经说明了获取的内容,简单而言就是 机械臂、方块、目标位置的 位置、姿态、角速度、线速度等信息。

用到的pybullet的API有: p.getLinkState:获取杆的状态 p.getEulerFromQuaternion:将一个四元数转化成欧拉角

p.getBasePositionAndOrientation:获取位置和姿态 p.getBaseVelocity:获取速度

对于不同的 API是有不同的返回值,详细信息还要查阅官方的文档pybullet quickly start: docs.google.com/documen

然后我们将它展成一个向量,因为都是numpy的数据格式,所以利用.flatten()把各个三维的向量展平。

最终我们返回的是一个字典形式,其中observation是上面展平的向量、achieved_goal是方块当前的位置、desired_goal则是期望的方块位置。

之所以写成这种形式是为了满足后续使用算法的需要,如果自己设计算法的话,完全可以直接返回展平的向量即可,不过不要忘了在向量中添加目标位置的信息,本文是单独作为字典中的键值来输出的。

4. action函数

action函数是我们传出了观察之后,经过算法的计算出了一个动作返回,我们实施这个动作,控制动作可以直接对机械臂的关节角施加角度或扭矩,不过这种方式完全没有必要,我们可以通过在二中描述的正逆运动学的方式,完成末端手的位置和姿态控制。

当然,如果agent是一个拥有多个关节的,例如蛇形机器人,那么采用扭矩输出的方式就合情合理了,毕竟那么大的关节空间,想要用逆运动学计算出一个解,也是相当难以计算出来的。

我们整合了在二中得到的正逆运动学的方式,完成整个动作的函数如下:

 def _set_action(self, action):
         self._bmirobot.applyAction(action)
         
      def applyAction(self, motorCommands): #4 actiions
       limit_x=[-1,1]
       limit_y=[-1,1]
       limit_z=[0,1]
       def clip_val(val,limit):
           if val<limit[0]:
               return limit[0]
           if val>limit[1]:
               return limit[1]
           return val
       if (self.useInverseKinematics):
           dx = motorCommands[0]
           dy = motorCommands[1]
           dz = motorCommands[2]
           fingerAngle = motorCommands[3]  # fingerangle应该是末端开口大小
           state = p.getLinkState(self.bmirobotid, self.bmirobot_righthand)  # 似乎并非如此,此处是getlinkstate
           actualEndEffectorPos = state[4]
           self.endEffectorPos[0] = clip_val(actualEndEffectorPos[0] + dx,limit_x)
           self.endEffectorPos[1] = clip_val(actualEndEffectorPos[1] + dy,limit_y)
           self.endEffectorPos[2] = clip_val(actualEndEffectorPos[2] + dz,limit_z)
           orn = [0, 0, 0]
           pos = self.endEffectorPos
           jointPoses = inverse.getinversePoisition(self.bmirobotid, pos)
           if (self.useSimulation):
               for i in range(3, 10):
                   p.setJointMotorControl2(bodyIndex=self.bmirobotid,
                                           jointIndex=i,
                                           controlMode=p.POSITION_CONTROL,
                                           targetPosition=jointPoses[i - 3],
                                           targetVelocity=0,
                                           force=500,
                                           positionGain=0.03,
                                           velocityGain=1)
           self.sent_hand_moving(fingerAngle)
         
       def sent_hand_moving(self,moterCommand):
       right_hand_joint = 10
       right_hand_joint2 = 11
       limit = [-1.57079632679, 1.57079632679]
       right_hand_joint_now = p.getJointState(self.bmirobotid, right_hand_joint)[0]
       right_hand_joint2_now = p.getJointState(self.bmirobotid, right_hand_joint2)[0]
       moterCommand_1 = right_hand_joint_now + moterCommand
       moterCommand_2 = right_hand_joint2_now - moterCommand
       #print("目前: ", str(right_hand_joint_now), " 到: ", str(moterCommand_1))
       # if moterCommand > 0:
       #     moterCommand = limit[1]
       # if moterCommand <= 0:
       #     moterCommand = limit[0]
       p.setJointMotorControl2(bodyIndex=self.bmirobotid,
                               jointIndex=right_hand_joint,
                               controlMode=p.POSITION_CONTROL,
                               targetPosition=moterCommand_1,
                               targetVelocity=0,
                               force=500,
                               positionGain=0.03,
                               velocityGain=1)
       p.setJointMotorControl2(bodyIndex=self.bmirobotid,
                               jointIndex=right_hand_joint2,
                               controlMode=p.POSITION_CONTROL,
                               targetPosition=moterCommand_2,
                               targetVelocity=0,
                               force=500,
                               positionGain=0.03,
                               velocityGain=1)

需要说明的是,实际上我们控制末端位置主要是 xyz以及末端手的张开大小,xyz我们利用逆运动学来求解即可,手臂末端的大小,对于两个关节而言实际上是相反的,而且只需要一个值。除此之外我们也需要多对动作进行裁减,太大的动作是不允许的。

5. step函数

step函数是推进智能体进入下一个循环,获取观察、动作、奖励等等信息。

def step(self, action):
         action = np.clip(action,-0.5,0.5)
         action[3]=0
         # if (p.getClosestPoints(self._bmirobot.bmirobotid, self.blockUid, 0.0001)): #如果臂和块足够靠近,可以锁死手爪
         #     action[3]=-1
         self._set_action(action)
         # print(action[3])
         #一个动作执行20个仿真步
         for _ in range(self.n_substeps):
             p.stepSimulation()
         obs = self._get_obs()
         done = False
         info = {
             'is_success': self._is_success(obs['achieved_goal'], self.goal),
         }
         reward = self.compute_reward(obs['achieved_goal'], self.goal, info)
         return obs, reward, done, info

需要注意的是,一方面,对于每一个动作,我们执行n个子步骤,即我们需要规定动作频率,在通常的设置下,每一次p.stepSimulation()默认计算1/240s,

如果我们的子步骤设置为20,则动作的频率为1/240s*20=1/12s 也就是12HZ。

另外,在抓取任务当中,当臂和块足够靠近,我们可以锁死手爪防止物体掉落。

最后在return中我们返回这些信息。

6. reset函数

虽然pybullet有本身的自带的重置仿真器的函数,但是一个重要的问题是,每次重置仿真器,系统都需要重新载入模型,这就是一个不可忽略的开销,因此我们不使用这个方法,而是重置环境中的状态,将机械臂的各个关节设置为初始位置,并重置方块和目标位置。

 def reset(self):
         p.setPhysicsEngineParameter(numSolverIterations=150)
         # 选择约束求解器迭代的最大次数。如果达到了solverResidualThreshold,求解器可能会在numsolver迭代之前终止
         for i in range(24):
             p.resetJointState(self._bmirobot.bmirobotid, i, 0, 0) #设置机械臂关节角归位
         p.setTimeStep(self._timeStep)
         # Cube Pos
         for _ in range(100):  #随机方块和目标位置,确保二者不会相距太近
             xpos = 0.15 +0.2 * random.random()  # 0.35
             ypos = (random.random() * 0.3) + 0.2  # 0.10 0.50
             zpos = 0.2
             ang = 3.14 * 0.5 + 3.1415925438 * random.random()
             orn = p.getQuaternionFromEuler([0, 0, ang])
             # target Position:
             xpos_target = 0.35 * random.random()  # 0.35
             ypos_target = (random.random() * 0.3) + 0.2  # 0.10 0.50
             zpos_target = 0.2
             ang_target = 3.14 * 0.5 + 3.1415925438 * random.random()
             orn_target = p.getQuaternionFromEuler([0, 0, ang_target])
             self.dis_between_target_block = math.sqrt(
                 (xpos - xpos_target) ** 2 + (ypos - ypos_target) ** 2 + (zpos - zpos_target) ** 2)
             if self.dis_between_target_block >= 0.15:
                 break
         if self.blockUid == -1:
             self.blockUid = p.loadURDF("URDF_model/cube_small_push.urdf", xpos, ypos, zpos,
                                        orn[0], orn[1], orn[2], orn[3])
             self.targetUid = p.loadURDF("URDF_model/cube_small_target_push.urdf",
                                         [xpos_target, ypos_target, zpos_target],
                                         orn_target, useFixedBase=1)
         else:
             p.removeBody(self.blockUid) #首次不需要remove,其他情况需要remove后再添加
             p.removeBody(self.targetUid)
             self.blockUid = p.loadURDF("URDF_model/cube_small_push.urdf", xpos, ypos, zpos,
                                        orn[0], orn[1], orn[2], orn[3])
             self.targetUid = p.loadURDF("URDF_model/cube_small_target_push.urdf",
                                         [xpos_target, ypos_target, zpos_target],
                                         orn_target, useFixedBase=1)
         #print([xpos_target,ypos_target,zpos_target])
         p.setCollisionFilterPair(self.targetUid, self.blockUid, -1, -1, 0) #实际上在URDF模型中我们就取消了cube_target的碰撞属性。
         self.goal=np.array([xpos_target,ypos_target,zpos_target])
         p.setGravity(0, 0, -10)
         self._envStepCounter = 0
         obs = self._get_obs()
         self._observation = obs
         return self._observation

至此我们就完成了强化学习的训练环境。