【Tello无人机】Tello飞行控制


tello

上一篇介绍了Tello无人机的仿真环境搭建,本篇将介绍tello无人机在pybullet环境中的飞行控制,实现无人机的速度控制。本环境最终要实现强化学习算法下的飞行任务,故采用通用的gym接口进行环境搭建。

Gym环境接口


tello

OpenAI Gym 包含许多有趣的环境,从具有经典控制任务的环境到可让你训练智能体玩 Atari 游戏的环境。但本文是想自己定义一个tello无人机飞行的环境,同时为了兼容强化学习算法,采用gym接口进行封装。gym 对环境镜像了抽象,无论多复杂的环境,最终暴露出来的只有gym.Env抽象的5个方法,本文在gym-pybullet-drones代码的基础上进行修改:

BaseAviary类

class BaseAviary(gym.Env):
    """该类是gym.Env的子类,表明它是用于强化学习环境的基本结构。以下是对每个部分的功能说明:"""
    def __init__(self,
                 num_drones: int = 1,
                 initial_xyzs=None,
                 initial_rpys=None,
                 pyb_freq: int = 240,
                 ctrl_freq: int = 240,
                 gui=False,
                 obstacles=False,
                 user_control_debug=False,
                 output_folder='results'
                 ):
      """用于初始化环境的实例。接受多个参数,包括无人机数量(num_drones)、初始位置和姿态信息(initial_xyzs 和 initial_rpys)、PyBullet模拟的频率(pyb_freq)、控制频率(ctrl_freq)、是否使用GUI进行可视化(gui)、是否包含障碍物(obstacles)、是否启用用户控制调试模式(user_control_debug)以及结果输出文件夹路径(output_folder)。"""
    def reset(self, seed: int = None, options: dict = None):
        """用于重置环境的状态。可以传递种子值(seed)和其他配置选项(options)。"""
    def step(self, action):
        """实现了环境的一个时间步骤。接受动作作为输入,返回包含观察、奖励、终止标志和其他信息的元组。"""
    def render(self, mode='human', close=False):
        """用于将环境可视化。接受模式(mode)和关闭标志(close)等参数。"""
    def close(self):
        """用于关闭环境,释放资源。"""

初始化函数,初始化模型信息,环境信息

 def __init__(self,
                 num_drones: int = 1,
                 initial_xyzs=None,
                 initial_rpys=None,
                 pyb_freq: int = 240,
                 ctrl_freq: int = 60,
                 gui=False,
                 obstacles=False,
                 user_control_debug=False,
                 show_traj=False,
                 output_folder='results'
                 ):

        #### Constants #############################################
        self.aim_h = 0.60
        self.G = 9.8
        self.RAD2DEG = 180 / np.pi
        self.DEG2RAD = np.pi / 180
        self.CTRL_FREQ = ctrl_freq
        self.PYB_FREQ = pyb_freq
        if self.PYB_FREQ % self.CTRL_FREQ != 0:  # 控制频率与系统仿真频率
            raise ValueError('[ERROR] in BaseAviary.__init__(), pyb_freq is not divisible by env_freq.')
        self.PYB_STEPS_PER_CTRL = int(self.PYB_FREQ / self.CTRL_FREQ)
        self.CTRL_TIMESTEP = 1. / self.CTRL_FREQ
        self.PYB_TIMESTEP = 1. / self.PYB_FREQ
        #### Parameters ############################################
        self.NUM_DRONES = num_drones
        #### Options ###############################################
        self.GUI = gui
        self.show_traj = show_traj
        self.OBSTACLES = obstacles
        self.user_control_debug = user_control_debug
        self.URDF = "tello.urdf"
        self.OUTPUT_FOLDER = output_folder
        #### Load the drone properties from the .urdf file ######### 模型参数
        self.M, \
        self.L, \
        self.THRUST2WEIGHT_RATIO, \
        self.J, \
        self.J_INV, \
        self.KF, \
        self.KM, \
        self.COLLISION_H, \
        self.COLLISION_R, \
        self.COLLISION_Z_OFFSET, \
        self.MAX_SPEED_KMH, \
        self.GND_EFF_COEFF, \
        self.PROP_RADIUS, \
        self.DRAG_COEFF, \
        self.DW_COEFF_1, \
        self.DW_COEFF_2, \
        self.DW_COEFF_3 = self._parseURDFParameters()
        print(
            "[INFO] BaseAviary.__init__() loaded parameters from the drone's .urdf:\n[INFO] m {:f}, L {:f},\n[INFO] ixx {:f}, iyy {:f}, izz {:f},\n[INFO] kf {:f}, km {:f},\n[INFO] t2w {:f}, max_speed_kmh {:f},\n[INFO] gnd_eff_coeff {:f}, prop_radius {:f},\n[INFO] drag_xy_coeff {:f}, drag_z_coeff {:f},\n[INFO] dw_coeff_1 {:f}, dw_coeff_2 {:f}, dw_coeff_3 {:f}".format(
                self.M, self.L, self.J[0, 0], self.J[1, 1], self.J[2, 2], self.KF, self.KM, self.THRUST2WEIGHT_RATIO,
                self.MAX_SPEED_KMH, self.GND_EFF_COEFF, self.PROP_RADIUS, self.DRAG_COEFF[0], self.DRAG_COEFF[2],
                self.DW_COEFF_1, self.DW_COEFF_2, self.DW_COEFF_3))
        #### Compute constants #####################################
        self.GRAVITY = self.G * self.M  # 无人机重力
        self.HOVER_RPM = np.sqrt(self.GRAVITY / (4 * self.KF))  # 计算悬停转速
        self.MAX_RPM = np.sqrt((self.THRUST2WEIGHT_RATIO * self.GRAVITY) / (4 * self.KF))  # 计算最大转速
        print("max_a:", self.MAX_RPM)
        self.MAX_THRUST = (4 * self.KF * self.MAX_RPM ** 2)  # 计算最大推力
        self.MAX_XY_TORQUE = (2 * self.L * self.KF * self.MAX_RPM ** 2) / np.sqrt(2)  # xy方向的最大推力
        self.MAX_Z_TORQUE = (2 * self.KM * self.MAX_RPM ** 2)  # z方向的最大推力
        self.GND_EFF_H_CLIP = 0.25 * self.PROP_RADIUS * np.sqrt(
            (15 * self.MAX_RPM ** 2 * self.KF * self.GND_EFF_COEFF) / self.MAX_THRUST)  # 计算地面效益系数

        if self.GUI:
            #### With debug GUI ########################################
            self.CLIENT = p.connect(p.GUI)  # p.connect(p.GUI, options="--opengl2")
            for i in [p.COV_ENABLE_RGB_BUFFER_PREVIEW, p.COV_ENABLE_DEPTH_BUFFER_PREVIEW,
                      p.COV_ENABLE_SEGMENTATION_MARK_PREVIEW, p.COV_ENABLE_GUI]:
                p.configureDebugVisualizer(i, 0, physicsClientId=self.CLIENT)
            # TODO 相机视角
            p.resetDebugVisualizerCamera(cameraDistance=2,
                                         cameraYaw=-0,
                                         cameraPitch=-80,
                                         cameraTargetPosition=[0, 0, 0],
                                         physicsClientId=self.CLIENT
                                         )
            ret = p.getDebugVisualizerCamera(physicsClientId=self.CLIENT)
            print("viewMatrix", ret[2])
            print("projectionMatrix", ret[3])
        else:
            #### Without debug GUI #####################################
            self.CLIENT = p.connect(p.DIRECT)
        #### Set initial poses #####################################
        if initial_xyzs is None:
            self.INIT_XYZS = np.array([[1, -1, 0.2]])
        elif np.array(initial_xyzs).shape == (self.NUM_DRONES, 3):
            self.INIT_XYZS = initial_xyzs
        else:
            print("[ERROR] invalid initial_xyzs in BaseAviary.__init__(), try initial_xyzs.reshape(NUM_DRONES,3)")
        if initial_rpys is None:
            self.INIT_RPYS = np.zeros((self.NUM_DRONES, 3))
        elif np.array(initial_rpys).shape == (self.NUM_DRONES, 3):
            self.INIT_RPYS = initial_rpys
        else:
            print("[ERROR] invalid initial_rpys in BaseAviary.__init__(), try initial_rpys.reshape(NUM_DRONES,3)")
        #### Create action and observation spaces ##################
        self.quaternion_orient_zero = p.getQuaternionFromEuler([0, 0, 0])
        # self.env_dim = 5.0  # 5*5m的仿真环境
        self.random_pos = True
        self.action_space = self._actionSpace()
        self.observation_space = self._observationSpace()
        self.init_env()
        self._task_reset()  # 加载任务场景
        self._updateAndStoreKinematicInformation()
        self.action_list = deque(maxlen=5)

复位函数,将环境恢复到初始状态

    def reset(self, seed: int = None, options: dict = None):
        # TODO : initialize random number generator with seed

        p.resetSimulation(physicsClientId=self.CLIENT)
        self._housekeeping()
        self._updateAndStoreKinematicInformation()
        self.end_point = [0, 0, 0]
        initial_obs = self._computeObs()
        initial_info = self._computeInfo()
        return initial_obs, initial_info

step函数,与环境交互接口

  def step(self, action):
        if self.user_control_debug and self.GUI:
            keys = p.getKeyboardEvents()  # 获取键盘动作
            for k, v in keys.items():
                if k == p.B3G_RIGHT_ARROW and (v & p.KEY_IS_DOWN):
                    action[0] += np.array([1, 0, 0, 0])
                if k == p.B3G_LEFT_ARROW and (v & p.KEY_IS_DOWN):
                    action[0] += np.array([-1, 0, 0, 0])
                if k == p.B3G_UP_ARROW and (v & p.KEY_IS_DOWN):
                    action[0] += np.array([0, 1, 0, 0])
                if k == p.B3G_DOWN_ARROW and (v & p.KEY_IS_DOWN):
                    action[0] += np.array([0, -1, 0, 0])
                if k == ord("1") and (v & p.KEY_IS_DOWN):
                    action[0] += np.array([0, 0, -1, 0])
                if k == ord("3") and (v & p.KEY_IS_DOWN):
                    action[0] += np.array([0, 0, 1, 0])
        self._saveLastAction(action)
        clipped_action = np.reshape(self._preprocessAction(action), (self.NUM_DRONES, 4))
        for _ in range(self.PYB_STEPS_PER_CTRL):  # 一个控制周期内执行物理仿真的步数
            if self.PYB_STEPS_PER_CTRL > 1:
                self._updateAndStoreKinematicInformation()
            self._physics(clipped_action[0, :], 0)
            p.stepSimulation(physicsClientId=self.CLIENT)
            self.last_clipped_action = clipped_action
        self._updateAndStoreKinematicInformation()
        self._showDroneLocalAxes(0)

        obs = self._computeObs()
        reward = self._computeReward()
        terminated = self._computeTerminated()
        truncated = self._computeTruncated()
        info = self._computeInfo()
        self.start_point = [obs[0][0], obs[0][1], 0]
        p.addUserDebugLine(self.start_point, self.end_point, lineColorRGB=[1, 0, 0], lineWidth=10)
        self.end_point = self.start_point
        #### Advance the step counter ##############################
        self.step_counter = self.step_counter + (1 * self.PYB_STEPS_PER_CTRL)
        return obs, reward, terminated, truncated, info

关闭环境

    def close(self):
        p.disconnect(physicsClientId=self.CLIENT)

环境注册

注册也非常简单,只要在任何地方调用gym提供的register方法即可,通常我们会将注册写在环境目录下的__init__.py文件中,这样在导入环境类的时候便能自动注册了,就像下面这样:

from gym.envs.registration import register

register(
    id='tello-v0',
    entry_point='gym_drones.envs:VelocityAviary',
)

无人机飞行控制

控制系统框图

下图为开源飞控控制系统框图,本文要在pybullet中实现仿真飞行,参考如下控制框图,搭建tello飞行控制系统


tello

控制部分代码:


tello

控制器参数

self.P_COEFF_FOR = np.array([1.6, 1.6, 1.25])
self.I_COEFF_FOR = np.array([.05, .05, .05])
self.D_COEFF_FOR = np.array([.2, .2, .5])
self.P_COEFF_TOR = np.array([70000., 70000., 60000.])
self.I_COEFF_TOR = np.array([.0, .0, 500.])
self.D_COEFF_TOR = np.array([20000., 20000., 12000.])
self.PWM2RPM_SCALE = 0.2685
self.PWM2RPM_CONST = 4070.3
self.MIN_PWM = 20000
self.MAX_PWM = 65535
self.MIXER_MATRIX = np.array([
    [-.5, -.5, -1],
    [-.5,  .5,  1],
    [.5, .5, -1],
    [.5, -.5,  1]
])

位置控制

采用PID(比例-积分-微分)控制器

T=P*p_e+I*i_e+D*v_e+[0,0,g]

  • P是比例系数
  • I是积分系数
  • D是微分系数
  • $p_e$ 是位置误差(目标位置 - 实际位置)
  • $i_e$ 是位置误差的积分
  • $v_e$ 是速度误差(目标速度 - 实际速度)
  • $[0, 0, g] $是重力加速度在 z 轴上的分量,作为 PID 输出的偏移量。
def _dslPIDPositionControl(self,
                           control_timestep,
                           cur_pos,
                           cur_quat,
                           cur_vel,
                           target_pos,
                           target_rpy,
                           target_vel
                          ):
    cur_rotation = np.array(p.getMatrixFromQuaternion(cur_quat)).reshape(3, 3)
    #通过将目标位置/速度减去当前位置/速度,计算位置误差(pos_e)和速度误差(vel_e)。
    pos_e = target_pos - cur_pos
    vel_e = target_vel - cur_vel
    #对位置误差进行积分,并进行截断以防止积分项过大。
    self.integral_pos_e = self.integral_pos_e + pos_e*control_timestep
    self.integral_pos_e = np.clip(self.integral_pos_e, -2., 2.)
    self.integral_pos_e[2] = np.clip(self.integral_pos_e[2], -0.15, .15)
    #### PID target thrust #####################################
    target_thrust = np.multiply(self.P_COEFF_FOR, pos_e) \
    + np.multiply(self.I_COEFF_FOR, self.integral_pos_e) \
    + np.multiply(self.D_COEFF_FOR, vel_e) + np.array([0, 0, self.GRAVITY])
    scalar_thrust = max(0., np.dot(target_thrust, cur_rotation[:,2]))
    thrust = (math.sqrt(scalar_thrust / (4*self.KF)) - self.PWM2RPM_CONST) / self.PWM2RPM_SCALE
    target_z_ax = target_thrust / np.linalg.norm(target_thrust)
    target_x_c = np.array([math.cos(target_rpy[2]), math.sin(target_rpy[2]), 0])
    target_y_ax = np.cross(target_z_ax, target_x_c) / np.linalg.norm(np.cross(target_z_ax, target_x_c))
    target_x_ax = np.cross(target_y_ax, target_z_ax)
    target_rotation = (np.vstack([target_x_ax, target_y_ax, target_z_ax])).transpose()
    #### Target rotation #######################################
    target_euler = (Rotation.from_matrix(target_rotation)).as_euler('XYZ', degrees=False)
    if np.any(np.abs(target_euler) > math.pi):
        print("\n[ERROR] ctrl it", self.control_counter, "in Control._dslPIDPositionControl(), values outside range [-pi,pi]")
        return thrust, target_euler, pos_e

姿态控制

采用PID(比例-积分-微分)控制器


T = - P * rot_e + D * \dot{rpy_e} + I * i_{rpy_e}

  • P 是比例系数
  • D 是微分系数
  • I 是积分系数
  • rot_e 是姿态误差(目标四元数 - 实际四元数)
  • $\dot{rpy_e}$是目标欧拉角速度与实际欧拉角速度的差值
  • $ i_{rpy_e}$是姿态误差的积分
def _dslPIDAttitudeControl(self,
                           control_timestep,
                           thrust,
                           cur_quat,
                           target_euler,
                           target_rpy_rates
                          ):
    cur_rotation = np.array(p.getMatrixFromQuaternion(cur_quat)).reshape(3, 3)
    cur_rpy = np.array(p.getEulerFromQuaternion(cur_quat))
    target_quat = (Rotation.from_euler('XYZ', target_euler, degrees=False)).as_quat()
    w,x,y,z = target_quat
    target_rotation = (Rotation.from_quat([w, x, y, z])).as_matrix()
    rot_matrix_e = np.dot((target_rotation.transpose()),cur_rotation) - np.dot(cur_rotation.transpose(),target_rotation)
    rot_e = np.array([rot_matrix_e[2, 1], rot_matrix_e[0, 2], rot_matrix_e[1, 0]]) 
    rpy_rates_e = target_rpy_rates - (cur_rpy - self.last_rpy)/control_timestep
    self.last_rpy = cur_rpy
    self.integral_rpy_e = self.integral_rpy_e - rot_e*control_timestep
    self.integral_rpy_e = np.clip(self.integral_rpy_e, -1500., 1500.)
    self.integral_rpy_e[0:2] = np.clip(self.integral_rpy_e[0:2], -1., 1.)
    #### PID target torques ####################################
    target_torques = - np.multiply(self.P_COEFF_TOR, rot_e) \
    + np.multiply(self.D_COEFF_TOR, rpy_rates_e) \
    + np.multiply(self.I_COEFF_TOR, self.integral_rpy_e)
    target_torques = np.clip(target_torques, -3200, 3200)
    pwm = thrust + np.dot(self.MIXER_MATRIX, target_torques)
    pwm = np.clip(pwm, self.MIN_PWM, self.MAX_PWM)
    return self.PWM2RPM_SCALE * pwm + self.PWM2RPM_CONST

实现效果

跟踪心形曲线

跟踪方形曲线


tello
import random

import gym
import numpy as np

import gym_drones

env = gym.make('tello-v0', gui=True)
print(gym_drones)
action_space = env.action_space

# 计算期望位置数组 desire_Pos
# 定义正方形的顶点
square_vertices = np.array([
    [0.8, -0.8],
    [-0.8, -0.8],
    [-0.8, 0.8],
    [0.8, 0.8],
    [0.8, -0.8]  # 闭合正方形
])
# 创建更多的插值点
num_points = 200
t = np.linspace(0, 1, num_points)
interpolated_points = np.array([np.interp(t, np.linspace(0, 1, len(square_vertices)), square_vertices[:, 0]),
                                np.interp(t, np.linspace(0, 1, len(square_vertices)), square_vertices[:, 1])]).T
# while True:
obs, initial_info = env.reset()
random_action = np.array([[0.0, 0.0, 0.0, 1.0]])
for i in range(200):
    for _ in range(30):
        random_action[0][0] = (interpolated_points[i][0] - obs[0][0]) * 1
        random_action[0][1] = (interpolated_points[i][1] - obs[0][1]) * 1
        obs, reward, terminated, truncated, info = env.step(random_action)
while True:
    pass

下一篇将介绍tello的物理系统及相关控制指令,实现物理系统的控制任务。

参考

[1] [创建最简单的gym自定义环境]https://gqw.github.io/posts/强化学习/

[2]W. Giernacki, J. Rao, S. Sladic, A. Bondyra, M. Retinger and T. Espinoza-Fraire, “DJI Tello Quadrotor as a Platform for Research and Education in Mobile Robotics and Control Engineering,” 2022 International Conference on Unmanned Aircraft Systems (ICUAS), Dubrovnik, Croatia, 2022, pp. 735-744, doi: 10.1109/ICUAS54217.2022.9836168.