上一篇介绍了Tello无人机仿真环境中的飞行控制,本篇将介绍tello无人机在物理系统中的轨迹跟踪,实现实物无人机的速度控制。本文采用的无人机为Tello TT,TELLO Talent由飞行器和拓展配件两部分组成。飞行器配备视觉定位系统,并集成飞控系统,能稳定悬停、飞行。可显示高清画面,并可实现拍照、录影、弹跳、全向翻滚以及一键飞行等功能。Tello相机可拍摄500万像素照片与720ρ高清视频。最长飞行时间约为13分钟。Tello TT SDK通过WiFi-UDP协议与飞行器连接,让用户可以通过文本指令控制飞行器。

无人机控制指令

这里使用库文件DJITelloPy,是一个大疆Tello无人机的Python接口, 使用官方 Tello SDKTello EDU SDK。 这个库支持使用所有的tello命令,可以轻松获取视频流,接受并解析状态包,可以操控多架无人机,支持Python3.6以上版本。

使用pip安装

pip install djitellopy -i https://pypi.tuna.tsinghua.edu.cn/simple/

本系统是要控制无人的速度,采用的接口是send_rc_control(left_right_velocity, forward_backward_velocity, up_down_velocity, yaw_velocity)

Name Type Description Default
left_right_velocity int -100~100 (left/right) _required_
forward_backward_velocity int -100~100 (forward/backward) _required_
up_down_velocity int -100~100 (up/down) _required_
yaw_velocity int -100~100 (yaw) _required_

光学定位系统

OptiTrack是高性能光学跟踪系统,可以获取场地中物体的亚像素级位置。采用被动式红外光学定位技术,每套系统内包含一个中央处理单元和多台红外摄像头,系统内的摄像头由中央处理单元统一调配并进行计算。 在被跟踪的目标物体表面上会固定一些红外反光率很高的标识点,以供系统进行跟踪和测量。下图安装在Tello上的即为定位标识点。

image-20240104155806470

使用Motive软件,根据定位点的位置,构造刚体,如下图所示

image-20240104155806470


可在project视图中查看刚体的的位姿信息,即无人机的位置x,y,z及欧拉角pitch,yaw,roll

image-20240104155806470


在Data Streaming视图中,开启局域网数据广播,用于数据传输,注意要填写本机IP及接收数据电脑的IP地址。

image-20240104155806470

经测试,工作在路由模式的Tello使用速度控制指令进行控制时,延时较高,不满足实验要求,所以采用直连模式进行实验,但此时的发送控制指令的计算机需连接tello无线网,无法接收定位系统的数据,故采用另一个计算机进行数据中转,数据传输框图如下图所示:

image-20240104155806470

Optitrack定位系统数据接收代码

import time

import serial

from NatNetClient import NatNetClient
from util import quaternion_to_euler

positions = {}
rotations = {}


# This is a callback function that gets connected to the NatNet client. It is called once per rigid body per frame
def receive_rigid_body_frame(id, position, rotation_quaternion):
    # Position and rotation received
    positions[id] = position
    # The rotation is in quaternion. We need to convert it to euler angles
    rotx, roty, rotz = quaternion_to_euler(rotation_quaternion)
    # Store the roll pitch and yaw angles
    rotations[id] = (rotx, roty, rotz)


def get_pos():
    clientAddress = "当前电脑IP"
    optitrackServerAddress = "定位系统电脑IP"
    robot_id = 1

    # This will create a new NatNet client
    streaming_client = NatNetClient()
    streaming_client.set_client_address(clientAddress)
    streaming_client.set_server_address(optitrackServerAddress)
    streaming_client.set_use_multicast(True)
    # Configure the streaming client to call our rigid body handler on the emulator to send data out.
    streaming_client.rigid_body_listener = receive_rigid_body_frame

    # Start up the streaming client now that the callbacks are set up.
    # This will run perpetually, and operate on a separate thread.
    is_running = streaming_client.run()
    port = serial.Serial("COM3", 115200)
    count = 0
    while is_running:
        if robot_id in positions:
            # last position
            send_data = [round(num, 4) for num in positions[robot_id]]
            port.write((str(send_data) + "\n").encode())
            print('position', send_data)
        time.sleep(.05)


if __name__ == "__main__":
    clientAddress = "当前电脑IP"
    optitrackServerAddress = "定位系统电脑IP"
    robot_id = 1

    # This will create a new NatNet client
    streaming_client = NatNetClient()
    streaming_client.set_client_address(clientAddress)
    streaming_client.set_server_address(optitrackServerAddress)
    streaming_client.set_use_multicast(True)
    # Configure the streaming client to call our rigid body handler on the emulator to send data out.
    streaming_client.rigid_body_listener = receive_rigid_body_frame

    # Start up the streaming client now that the callbacks are set up.
    # This will run perpetually, and operate on a separate thread.
    is_running = streaming_client.run()
    port = serial.Serial("COM3", 115200)
    count = 0
    while is_running:
        if robot_id in positions:
            # last position
            send_data = [round(num, 4) for num in positions[robot_id]]
            port.write((str(send_data) + "\n").encode())
            print('position', send_data)
        time.sleep(.05)
port.close()

无人机闭环飞行代码

from djitellopy import Tello
import cv2
import pygame
import numpy as np
import time
import math
import matplotlib.pyplot as plt
import serial
import threading
import time
import keyboard
import serial
from queue import Queue
import scipy.io
from datetime import datetime

# 无人机的速度
S = 60
FPS = 120
import keyboard


class Tello_Control(object):
    def __init__(self):
        self.tello = Tello()
        self.speed = 100

        self.tello.connect()
        self.tello.set_speed(self.speed)

    def take_off(self):
        self.tello.takeoff()

    def take_land(self):
        self.tello.land()

    def control(self, for_back_velocity, left_right_velocity, up_down_velocity, yaw_velocity):
        # 将变量的值限制在-100到100之间
        for_back_velocity = int(max(min(for_back_velocity, 100), -100))
        left_right_velocity = int(max(min(left_right_velocity, 100), -100))
        up_down_velocity = int(max(min(up_down_velocity, 100), -100))
        yaw_velocity = int(max(min(yaw_velocity, 100), -100))
        self.tello.send_rc_control(left_right_velocity, for_back_velocity,
                                   up_down_velocity, yaw_velocity)


def polar_to_cartesian(radius, angle):
    x = radius * math.cos(math.radians(angle))
    y = radius * math.sin(math.radians(angle))
    return x, y


trajectory_x = []
trajectory_y = []


def tello_control_thread(data_queue):
    tello = Tello_Control()
    tello.take_off()
    current_count = 0
    radius = 1
    angular_velocity = 3
    while True:
        angle = current_count * angular_velocity
        Px, Py = polar_to_cartesian(radius, angle)
        real_pos = data_queue.get()
        Vx = (Px - real_pos[0]) * 100
        Vy = (Py - real_pos[2]) * 100
        print(Px, real_pos[0], Vx, Py, real_pos[2], Vy)
        tello.control(Vx, Vy, 0, 0)
        if keyboard.is_pressed('esc'):
            print("Exiting the loop")
            break
        current_count += 1
        if angle > 360:
            current_count = 0
    tello.take_land()


trajectory_input = []
trajectory_output = []


def tello_identify_thread(data_queue):
    tello = Tello_Control()
    tello.take_off()
    current_count = 0
    radius = 60
    angular_velocity = 9

    while True:
        angle = current_count * angular_velocity
        Px, Py = polar_to_cartesian(radius, angle)
        real_pos = data_queue.get()
        input_data = [Px, Py]
        output_data = [real_pos[0], real_pos[2]]
        trajectory_input.append(input_data)
        trajectory_output.append(output_data)
        # Vx = (Px - real_pos[0]) * 100
        # Vy = (Py - real_pos[2]) * 100

        tello.control(Px, Py, 0, 0)
        if keyboard.is_pressed('esc'):
            print("Exiting the loop")
            break
        current_count += 1
        if angle >= 360:
            current_count = 1
    data_dict = {
        'input': trajectory_input,
        'output': trajectory_output
    }
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    mat_filename = f"data_{timestamp}.mat"
    scipy.io.savemat(mat_filename, data_dict)
    tello.take_land()


def serial_read_thread(data_queue):
    ser = serial.Serial('COM6', 115200, timeout=0.001)
    while True:
        feedback = ser.readline().decode('utf-8')
        if len(feedback) > 24:
            data_list = feedback.strip('[ ]\n').split(', ')
            real_pos = [float(num) for num in data_list]
            data_queue.put(real_pos)


def main():
    # 创建队列用于线程间通信
    data_queue = Queue()

    # 创建两个线程,一个用于Tello控制,一个用于从串口读取数据
    tello_thread = threading.Thread(target=tello_identify_thread, args=(data_queue,))
    serial_thread = threading.Thread(target=serial_read_thread, args=(data_queue,))

    # 启动线程
    tello_thread.start()
    serial_thread.start()

    # 等待两个线程结束
    tello_thread.join()
    serial_thread.join()


if __name__ == '__main__':
    main()

实现效果

image-20240104155806470

参考

https://github.com/damiafuentes/DJITelloPy

https://dl.djicdn.com/downloads/RoboMaster%20TT/RoboMaster_TT_Swarm_Combo_User_Guide_v1.2_cnI.pdf