在本系列二中,完成了机械臂的正逆运动学搭建过程,也就是可以指定机械臂的末端到达某一个点了 Losgy浩:基于Pybullet搭建强化学习机械臂(二)
下面的内容将阐述如何利用gym模块来搭建强化学习的训练环境。本文的代码参考的是mujoco中的fetch机器人环境。
本文完整项目地址https://github.com/PiggyCh/RL_arm_under_sparse_reward
首先,我们需要写一个类,来满足agent的各种步骤调用过程。这个类是基于gym的,因此我们直接继承gym.env,并在此基础上对函数进行重写
class bmirobotGymEnv(gym.Env):
重温一下强化学习的基本MDP框架:

所以在API方面我们需要重写的函数如下:
reward: 计算奖励函数
get_state: 得到返回的状态向量
action: 实施动作的函数
done:判断是否操作成功的函数
reset: 重置环境的函数
下面我们分别实现:
1. Init对类进行初始化
def __init__(
self, model_path, n_substeps, gripper_extra_height, block_gripper,
has_object, target_in_the_air, target_offset, obj_range, target_range,
distance_threshold, initial_qpos, reward_type,
):
"""Initializes a new Fetch environment.
Args:
model_path (string): XML文件的路径,这里可以写URDF,在bmirobot里用的是Pybullet环境
n_substeps (int): 目前推测n-substep是 每次step用的步数。比如一个动作发出后,后续25个时间步骤就继续执行动作
gripper_extra_height (float): 当定位夹持器时,额外的高度高于桌子
block_gripper (boolean): 抓手是否被阻塞(即不能移动)
has_object (boolean):环境中是否有对象
target_in_the_air (boolean):目标物是否应该在桌子上方的空中或桌面上
target_offset (float or array with 3 elements): 目标偏移量
obj_range (float): 初始目标位置采样的均匀分布范围
target_range (float):采样目标的均匀分布范围
distance_threshold (float): 目标达到之后的临界值
initial_qpos (dict):定义初始配置的联合名称和值的字典
reward_type ('sparse' or 'dense'):奖励类型,如稀疏或密集
"""
IS_USEGUI = Args().Use_GUI
self.gripper_extra_height = gripper_extra_height
self.block_gripper = block_gripper
self.has_object = has_object
self.target_in_the_air = target_in_the_air
self.target_offset = target_offset
self.obj_range = obj_range
self.target_range = target_range
self.distance_threshold = distance_threshold
self.reward_type = reward_type
self.model_path=model_path
self.n_substeps=n_substeps
self.n_actions=4
self.blockUid = -1
self.initial_qpos=initial_qpos
self._urdfRoot = pybullet_data.getDataPath()
self.seed()
#是否进行渲染,GUI是图形界面,direct是不渲染
if IS_USEGUI:
self.physics = p.connect(p.GUI)
else:
self.physics = p.connect(p.DIRECT)
#加载机器人模型
self._bmirobot = bmirobotv0()
self._timeStep= 1. / 240.
action_dim = 4
self._action_bound = 0.5
# 这里的action和obs space 的low and high 可能需要再次考虑
action_high = np.array([self._action_bound] * action_dim)
self.action_space = spaces.Box(-action_high, action_high)
#重置环境
self.reset()
# GoalEnv methods
在init中我们设置了一些基本的参数方便我们在后续设置函数中使用
2. reward函数:
奖励的设置在强化学习中,一般设置的方式有 密集奖励 和 稀疏奖励。
所谓密集奖励,指的是为了完成任务,我们在空间中设置丰富的奖励来引导智能体一步一步完成任务。设置密集奖励存在一些问题,例如,针对特定的任务我们需要设定不同特定的奖励,而且简单的线性变换会使得最终agent训练的结果有很大的不同,我们需要不断地调整才能让训练有较好的收敛效果,等等一系列问题既不优雅,也不好处理。
另一种奖励方式是稀疏奖励,即空间中的奖励很少,智能体缺乏奖励引导,例如,只有当agent完成任务,我们才给他一个reward signal,不然每个操作的奖励是-1的惩罚,这样的设置相比密集奖励的好处,当然是非常优雅的,但是,稀疏奖励的问题往往对于agent学习而言也是一个挑战,文本设置了稀疏奖励的方式,并且为了解决稀疏奖励的学习难度,添加了几个技巧,这在后续训练的过程中,会继续介绍。
首先我们先写好奖励的函数
def compute_reward(self, achieved_goal, goal, info):
# Compute distance between goal and the achieved goal.
d = goal_distance(achieved_goal, goal)
if self.reward_type == 'sparse':
return -(d > self.distance_threshold).astype(np.float32)
else:
return -d
def goal_distance(goal_a, goal_b):
assert goal_a.shape == goal_b.shape
# np.linalg.norm指求范数,默认是l2范数
return np.linalg.norm(goal_a - goal_b, axis=-1)
可以看到这里我们设置了一个阈值distance_threshold
,当前方块的位置与目标位置的距离在阈值内时,我们就判定成功。
如果奖励是非稀疏的,则是将二者的距离d作为了奖励信号。
3. 获取状态函数
在强化学习中,我们是根据环境所给我们的状态来确定采取什么动作的,也就是对于强化学习算法而言,状态是作为一个输入向量进行计算的,这里我们是利用pybullet提供的一些api来计算相关的参数。
def _get_obs(self):
# 关于机械臂的状态观察,可以从以下几个维度进行考虑
# 末端位置、夹持器状态位置、物体位置、物体姿态、 物体相对末端位置、物体线速度、物体角速度、末端速度、物体相对末端线速度
# 末端位置 3vec 及速度
end_pos = np.array(self._bmirobot.getObservation())
end_pos = end_pos[:3]
# 夹持器位置、姿姿 vec3 *2 ,可能需要重新考虑一下末端位置和grip的关系
gripperState = p.getLinkState(self._bmirobot.bmirobotid, self._bmirobot.bmirobot_righthand,
computeLinkVelocity=1)
gripperPos = np.array(gripperState[4])
gripperOrn_temp = np.array(gripperState[5])
gripper_linear_Velocity = np.array(gripperState[6])
gripper_angular_Velocity = np.array(gripperState[7])
# 把四元数转换成欧拉角,使数据都是三维的
gripperOrn = p.getEulerFromQuaternion(gripperOrn_temp)
gripperOrn = np.array(gripperOrn)
# 物体位置、姿态
blockPos, blockOrn_temp = p.getBasePositionAndOrientation(self.blockUid)
blockPos = np.array(blockPos)
blockOrn = p.getEulerFromQuaternion(gripperOrn_temp)
blockOrn = np.array(blockOrn)
# 物体相对位置 vec *3
relative_pos = blockPos - gripperPos
# 物体的线速度和角速度
block_Velocity = p.getBaseVelocity(self.blockUid)
block_linear_velocity = np.array(block_Velocity[0])
target_pos = np.array(p.getBasePositionAndOrientation(self.targetUid)[0])
block_angular_velocity = np.array(block_Velocity[1])
obs = [
end_pos.flatten(),
gripperOrn.flatten(),
gripper_linear_Velocity.flatten(),
gripper_angular_Velocity.flatten(),
blockPos.flatten(),
blockOrn.flatten(),
relative_pos.flatten(),
block_linear_velocity.flatten(),
block_angular_velocity.flatten(),
]
if not self.has_object:
achieved_goal = end_pos.copy()
else:
achieved_goal = blockPos.copy()
for i in range(1, len(obs)):
end_pos = np.append(end_pos, obs[i])
obs = end_pos.reshape(-1)
self._observation = obs
return {
'observation': obs.copy(),
'achieved_goal': achieved_goal.copy(),
'desired_goal': target_pos.flatten(),
}
我们获取到的环境信息有30维,在注释中已经说明了获取的内容,简单而言就是 机械臂、方块、目标位置的 位置、姿态、角速度、线速度等信息。
用到的pybullet的API有: p.getLinkState:获取杆的状态
p.getEulerFromQuaternion:将一个四元数转化成欧拉角
p.getBasePositionAndOrientation:获取位置和姿态
p.getBaseVelocity:获取速度
对于不同的 API是有不同的返回值,详细信息还要查阅官方的文档pybullet quickly start
: https://docs.google.com/document/d/10sXEhzFRSnvFcl3XxNGhnD4N2SedqwdAvK3dsihxVUA/edit#
然后我们将它展成一个向量,因为都是numpy的数据格式,所以利用.flatten()把各个三维的向量展平。
最终我们返回的是一个字典形式,其中observation是上面展平的向量、achieved_goal是方块当前的位置、desired_goal则是期望的方块位置。
之所以写成这种形式是为了满足后续使用算法的需要,如果自己设计算法的话,完全可以直接返回展平的向量即可,不过不要忘了在向量中添加目标位置的信息,本文是单独作为字典中的键值来输出的。
4. action函数
action函数是我们传出了观察之后,经过算法的计算出了一个动作返回,我们实施这个动作,控制动作可以直接对机械臂的关节角施加角度或扭矩,不过这种方式完全没有必要,我们可以通过在二中描述的正逆运动学的方式,完成末端手的位置和姿态控制。

当然,如果agent是一个拥有多个关节的,例如蛇形机器人,那么采用扭矩输出的方式就合情合理了,毕竟那么大的关节空间,想要用逆运动学计算出一个解,也是相当难以计算出来的。
我们整合了在二中得到的正逆运动学的方式,完成整个动作的函数如下:
def _set_action(self, action):
self._bmirobot.applyAction(action)
def applyAction(self, motorCommands): #4 actiions
limit_x=[-1,1]
limit_y=[-1,1]
limit_z=[0,1]
def clip_val(val,limit):
if val<limit[0]:
return limit[0]
if val>limit[1]:
return limit[1]
return val
if (self.useInverseKinematics):
dx = motorCommands[0]
dy = motorCommands[1]
dz = motorCommands[2]
fingerAngle = motorCommands[3] # fingerangle应该是末端开口大小
state = p.getLinkState(self.bmirobotid, self.bmirobot_righthand) # 似乎并非如此,此处是getlinkstate
actualEndEffectorPos = state[4]
self.endEffectorPos[0] = clip_val(actualEndEffectorPos[0] + dx,limit_x)
self.endEffectorPos[1] = clip_val(actualEndEffectorPos[1] + dy,limit_y)
self.endEffectorPos[2] = clip_val(actualEndEffectorPos[2] + dz,limit_z)
orn = [0, 0, 0]
pos = self.endEffectorPos
jointPoses = inverse.getinversePoisition(self.bmirobotid, pos)
if (self.useSimulation):
for i in range(3, 10):
p.setJointMotorControl2(bodyIndex=self.bmirobotid,
jointIndex=i,
controlMode=p.POSITION_CONTROL,
targetPosition=jointPoses[i - 3],
targetVelocity=0,
force=500,
positionGain=0.03,
velocityGain=1)
self.sent_hand_moving(fingerAngle)
def sent_hand_moving(self,moterCommand):
right_hand_joint = 10
right_hand_joint2 = 11
limit = [-1.57079632679, 1.57079632679]
right_hand_joint_now = p.getJointState(self.bmirobotid, right_hand_joint)[0]
right_hand_joint2_now = p.getJointState(self.bmirobotid, right_hand_joint2)[0]
moterCommand_1 = right_hand_joint_now + moterCommand
moterCommand_2 = right_hand_joint2_now - moterCommand
#print("目前: ", str(right_hand_joint_now), " 到: ", str(moterCommand_1))
# if moterCommand > 0:
# moterCommand = limit[1]
# if moterCommand <= 0:
# moterCommand = limit[0]
p.setJointMotorControl2(bodyIndex=self.bmirobotid,
jointIndex=right_hand_joint,
controlMode=p.POSITION_CONTROL,
targetPosition=moterCommand_1,
targetVelocity=0,
force=500,
positionGain=0.03,
velocityGain=1)
p.setJointMotorControl2(bodyIndex=self.bmirobotid,
jointIndex=right_hand_joint2,
controlMode=p.POSITION_CONTROL,
targetPosition=moterCommand_2,
targetVelocity=0,
force=500,
positionGain=0.03,
velocityGain=1)
需要说明的是,实际上我们控制末端位置主要是 xyz以及末端手的张开大小,xyz我们利用逆运动学来求解即可,手臂末端的大小,对于两个关节而言实际上是相反的,而且只需要一个值。除此之外我们也需要多对动作进行裁减,太大的动作是不允许的。
5. step函数
step函数是推进智能体进入下一个循环,获取观察、动作、奖励等等信息。
def step(self, action):
action = np.clip(action,-0.5,0.5)
action[3]=0
# if (p.getClosestPoints(self._bmirobot.bmirobotid, self.blockUid, 0.0001)): #如果臂和块足够靠近,可以锁死手爪
# action[3]=-1
self._set_action(action)
# print(action[3])
#一个动作执行20个仿真步
for _ in range(self.n_substeps):
p.stepSimulation()
obs = self._get_obs()
done = False
info = {
'is_success': self._is_success(obs['achieved_goal'], self.goal),
}
reward = self.compute_reward(obs['achieved_goal'], self.goal, info)
return obs, reward, done, info
需要注意的是,一方面,对于每一个动作,我们执行n个子步骤,即我们需要规定动作频率,在通常的设置下,每一次p.stepSimulation()默认计算1/240s,
如果我们的子步骤设置为20,则动作的频率为1/240s*20=1/12s 也就是12HZ。
另外,在抓取任务当中,当臂和块足够靠近,我们可以锁死手爪防止物体掉落。
最后在return中我们返回这些信息。
6. reset函数
虽然pybullet有本身的自带的重置仿真器的函数,但是一个重要的问题是,每次重置仿真器,系统都需要重新载入模型,这就是一个不可忽略的开销,因此我们不使用这个方法,而是重置环境中的状态,将机械臂的各个关节设置为初始位置,并重置方块和目标位置。
def reset(self):
p.setPhysicsEngineParameter(numSolverIterations=150)
# 选择约束求解器迭代的最大次数。如果达到了solverResidualThreshold,求解器可能会在numsolver迭代之前终止
for i in range(24):
p.resetJointState(self._bmirobot.bmirobotid, i, 0, 0) #设置机械臂关节角归位
p.setTimeStep(self._timeStep)
# Cube Pos
for _ in range(100): #随机方块和目标位置,确保二者不会相距太近
xpos = 0.15 +0.2 * random.random() # 0.35
ypos = (random.random() * 0.3) + 0.2 # 0.10 0.50
zpos = 0.2
ang = 3.14 * 0.5 + 3.1415925438 * random.random()
orn = p.getQuaternionFromEuler([0, 0, ang])
# target Position:
xpos_target = 0.35 * random.random() # 0.35
ypos_target = (random.random() * 0.3) + 0.2 # 0.10 0.50
zpos_target = 0.2
ang_target = 3.14 * 0.5 + 3.1415925438 * random.random()
orn_target = p.getQuaternionFromEuler([0, 0, ang_target])
self.dis_between_target_block = math.sqrt(
(xpos - xpos_target) ** 2 + (ypos - ypos_target) ** 2 + (zpos - zpos_target) ** 2)
if self.dis_between_target_block >= 0.15:
break
if self.blockUid == -1:
self.blockUid = p.loadURDF("URDF_model/cube_small_push.urdf", xpos, ypos, zpos,
orn[0], orn[1], orn[2], orn[3])
self.targetUid = p.loadURDF("URDF_model/cube_small_target_push.urdf",
[xpos_target, ypos_target, zpos_target],
orn_target, useFixedBase=1)
else:
p.removeBody(self.blockUid) #首次不需要remove,其他情况需要remove后再添加
p.removeBody(self.targetUid)
self.blockUid = p.loadURDF("URDF_model/cube_small_push.urdf", xpos, ypos, zpos,
orn[0], orn[1], orn[2], orn[3])
self.targetUid = p.loadURDF("URDF_model/cube_small_target_push.urdf",
[xpos_target, ypos_target, zpos_target],
orn_target, useFixedBase=1)
#print([xpos_target,ypos_target,zpos_target])
p.setCollisionFilterPair(self.targetUid, self.blockUid, -1, -1, 0) #实际上在URDF模型中我们就取消了cube_target的碰撞属性。
self.goal=np.array([xpos_target,ypos_target,zpos_target])
p.setGravity(0, 0, -10)
self._envStepCounter = 0
obs = self._get_obs()
self._observation = obs
return self._observation
至此我们就完成了强化学习的训练环境。
评论(0)
您还未登录,请登录后发表或查看评论