0x00 往期博文
《HelloOriginBot::使用Python编写Joy手柄功能包》
0x01 前言
OriginBot小车 基于 旭日X3pi 和 tros 开发,为了能够编写出更优秀的 'App' 释放小车的潜能,我们需要熟读 旭日X3pi 和 tros 的用户手册以便更好地理解本文内容;
在用户手册中,需重点了解以下内容:
- x3派面向生态开发者的产品定位,以及如何通过 Ai工具链 与 机器人开发平台(tros) 发挥5Tops算力;
- x3派系统安装、配置,了解 x3派 的硬件接口,体验基础图像采集和Ai推理功能;
- 了解 tros 基于 ros 新增的功能模块,例如 Hobot Sensor(机器人常用传感器)、Hobot Codec(视频编码器)、Hobot Render(Web端可视化)等;
- 体验 tros 图像采集->图像编解码->图像展示 功能;
- 了解 Boxs 算法仓库 和 Apps 应用实例;
- 学习 小车人体跟随(body_tracking) App 源码;
您可以通过 '地平线程序员奶爸' 的教程来深入学习 tros 手势控制app和人体追踪app:
开发者说 | 地平线程序员奶爸带你玩转机器人开发平台 —— 第一期 手势控制(附直播课视频)
开发者说 | 地平线程序员奶爸带你玩转机器人开发平台 —— 第二期 人体跟随(附直播课视频)
0x02 白话
说白了,旭日x3pi为我们机器人开发提供了硬件平台,tros为我们机器人开发提供了软件平台,二者需结合起来一起使用;
硬件平台为我们提供了很高的算力,而软件平台则为发挥硬件的潜能提供了丰富的 demo;
我们可以借助这些demo(Boxs和Apps),高效、便捷(复制、粘贴)的进行机器人开发,打造具有竞争力的智能机器人产品。
0x03 创建人脸追踪功能包
通过ssh连接上OriginBot小车,cd进 /userdata/dev_ws/src/originbot 目录,创建 originbot_face_tracking 功能包:
ros2 pkg create --build-type ament_python originbot_face_tracking
这里的命名规则与Originbot其他功能包一致
0x04 编写Launch文件
在功能包路径下新建 launch 目录,编写 ob_face_tracking.launch.py 文件欲实现的系统框图如下:
其中椭圆形框内为Node名,矩形框内为topic名。
节点名与功能对照下图:
- 传感节点使用了 tros Hobot Ssnros组件中的 /mipi_cam Node,参数指定使用 GC4663 摄像头,发布的图片格式和分辨率可以直接用于算法推理,其中通过共享内存的方式发布图片,可以极大地降低系统负载和传输延迟。
- 感知节点使用了 tros 中的 Boxs 算法仓库,订阅 Hobot Sensor(mipi_cam Node)发布的图像数据后,通过视觉算法进行推理,检测出人体关键部位;
- 人体检测和跟踪算法Node 订阅 Hobot Senso r发布的图像消息,利用 BPU 处理器进行AI推理,发布包含人体、人头、人脸、人手框和人体关键点检测结果的AI msg,并通过多目标跟踪(multi-target tracking,即MOT)功能,实现检测框的跟踪和ID分配。
- 交互节点,人体检测结果订阅节点 /ai_msg_sub_node,订阅人体检测和跟踪节点发布的 Ai msg 消息,执行跟踪策略。根据人脸在画面中的位置通过比例控制器计算出小车旋转角度;
- 控制节点/ob_tracking_Robot_CmdVel,发布消息名为'/cmd_vel'的命令控制小车做旋转运动;
- WEB展示,由 /hobot_codec 将 /mipi_cam 发布的 ’nv12‘ 格式的图像转换为 ’jpeg‘ 格式,以便 /websocket 展示。同时, /websocket 节点订阅 /hobot_mono2d_body_detection 消息用于图像渲染;
ob_face_tracking.launch.py 文件的代码如下:
import os
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import IncludeLaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from ament_index_python import get_package_share_directory
def generate_launch_description():
web_service_launch_include = IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(
get_package_share_directory('websocket'),
'launch/hobot_websocket_service.launch.py'))
)
return LaunchDescription([
web_service_launch_include,
# 启动图片发布pkg
Node(
package='mipi_cam',
executable='mipi_cam',
output='screen',
parameters=[
{"camera_calibration_file_path": "/opt/tros/lib/mipi_cam/config/GC4663_calibration.yaml"},
{"out_format": "nv12"},
{"image_width": 960},
{"image_height": 544},
{"io_method": "shared_mem"},
{"video_device": "GC4663"}
],
arguments=['--ros-args', '--log-level', 'error']
),
# 启动jpeg图片编码&发布pkg
Node(
package='hobot_codec',
executable='hobot_codec_republish',
output='screen',
parameters=[
{"channel": 1},
{"in_mode": "shared_mem"},
{"in_format": "nv12"},
{"out_mode": "ros"},
{"out_format": "jpeg"},
{"sub_topic": "/hbmem_img"},
{"pub_topic": "/image_jpeg"}
],
arguments=['--ros-args', '--log-level', 'error']
),
# 启动单目rgb人体、人头、人脸、人手框和人体关键点检测pkg
Node(
package='mono2d_body_detection',
executable='mono2d_body_detection',
output='screen',
parameters=[
{"ai_msg_pub_topic_name": "/hobot_mono2d_body_detection"},
{"model_file_name": "/opt/tros/lib/mono2d_body_detection/config/multitask_body_head_face_hand_kps_960x544.hbm"}
],
arguments=['--ros-args', '--log-level', 'error']
),
# 启动web展示pkg
Node(
package='websocket',
executable='websocket',
output='screen',
parameters=[
{"image_topic": "/image_jpeg"},
{"image_type": "mjpeg"},
{"smart_topic": "/hobot_mono2d_body_detection"}
],
arguments=['--ros-args', '--log-level', 'error']
),
# 启动人脸追踪节点
Node(
package='originbot_face_tracking',
executable='face_tracking',
output='screen',
parameters=[
],
)
])
0x05 功能包的实现
功能包中的 py 文件如下:
./oringinbot_face_tracking
├── common.py # 通用定义,如 图像宽、高等
├── face_tracking.py # 人脸追踪主程序
├── __init__.py
├── main.py # 程序入口,维护一个SingleThreadedExecutor,负责调起其他节点
├── robot_ctrl_node.py # 提供 robot_ctl(msg:Twist) 接口,负责发布 /cmd_vel 消息
├── smart_subscriber.py # 订阅 /hobot_mono2d_body_detection 消息,之后调用 face_tracking.FeedSmart(msg:PerceptionTargets) 灌入 Ai msg
└── time_helper.py # 时间计算工具
功能包程序设计如下:
略缩图如下:
main.py 代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'ning'
"""
@作者: ning
@说明: 人脸跟踪app程序入口, 调起其它程序接口.
创建并维护一个单线程执行器(SingleThreadedExecutor)将追踪器(TrackingManager)的所有节点添加到执行器中, 同时也将'智能消息订阅(smart_subscriber)'节点添加到执行器中;
接着, 将追踪管理器(TrackingManager)的FeedSmart函数绑定到智能消息订阅节点(smart_subscriber)的回调函数中,
并启动单线程执行器的主循环函数,使得执行器可以在单独的线程中执行所有节点的回调函数。
最后, 在程序退出之前, 释放追踪管理器的节点, 并关闭ROS2节点。
"""
import rclpy
from .smart_subscriber import SmartMsgSubscriber
from .face_tracking import TrackingManager
from functools import partial
def main():
print("\n\tThis is face tracking package.\n\n"
"Only rotate the robot without moving forward and backward.\n"
"============================================\n")
# 初始化 ROS2 节点
rclpy.init()
# 创建一个节点执行器
node_executor = rclpy.executors.SingleThreadedExecutor()
# 获取 TrackingManger 单例的所有Nodes;
nodes = TrackingManager.Instance().GetNodes()
for node in nodes:
node_executor.add_node(node) # 将每个节点添加到执行器中,以便于执行器可以在单独的线程中执行回调函数;
# 将追踪管理器的 FeedSmart 函数绑定到一个回调函数中,将追踪管理器的单例对象作为第一个参数传入,订阅的消息作为第二个参数传入。
smart_msg_subscriber = SmartMsgSubscriber(
"ai_msg_sub_node",
partial(TrackingManager.FeedSmart, TrackingManager.Instance())) # 表示回调函数中的第一个参数,即订阅的消息。
node_executor.add_node(smart_msg_subscriber) # 添加智能消息订阅节点..
node_executor.spin() # 启动执行器的主循环。该函数会一直阻塞,直到执行器被停止。
# release node before shutdown!
TrackingManager.Instance().Release()
rclpy.shutdown() # 关闭ROS2 Python接口
print("tracking node exit!")
# 测试用
if __name__ == '__main__':
main()
smart_subscriber.py 代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'ning'
"""
@作者: ning
@说明:
"""
import rclpy
from rclpy.node import Node
from ai_msgs.msg import PerceptionTargets
from typing import Callable # 函数类型 func
class SmartMsgSubscriber(Node):
def __init__(self, node_name: str, smart_cb: Callable[[PerceptionTargets], None] = None):
super().__init__(node_name=node_name)
# main.py 传递来的 回调函数
self.smart_cb = smart_cb
# 声明一个ROS参数, 定义'智能订阅的话题名称'
self.declare_parameter('ai_msg_sub_topic_name',
'/hobot_mono2d_body_detection') # 默认 hobot_hand_gesture_detection 可选 hobot_mono2d_body_detection
self.ai_msg_sub_topic_name = self.get_parameter(
'ai_msg_sub_topic_name').value
self.get_logger().info(
f"Parameter:\nai_msg_sub_topic_name: {self.ai_msg_sub_topic_name}")
# 智能订阅,其实就是可配置订阅话题名称;
self.subscription = self.create_subscription(
PerceptionTargets,
self.ai_msg_sub_topic_name,
self.topic_callback,
10
)
# 智能订阅的回调函数, 输出信息后调用 smart_cb
def topic_callback(self, msg:PerceptionTargets):
ss = "Recved ai msg, frame_id: {}, stamp: {}_{}, targets size: {}\n".format(
msg.header.frame_id, msg.header.stamp.sec, msg.header.stamp.nanosec, len(msg.targets))
for tar in msg.targets:
ss += "\t track_id: {}, type: {}, has roi num: {}".format(
tar.track_id, tar.type, len(tar.rois))
for roi in tar.rois:
ss += ", roi type: {}".format(roi.type)
ss += ", has attr num: {}".format(len(tar.attributes))
for attr in tar.attributes:
ss += ", attr type: {}, val: {}".format(attr.type, attr.value)
ss += "\n"
self.get_logger().info(ss)
# 处理消息
if self.smart_cb:
self.smart_cb(msg)
else:
self.get_logger().warn("smart_cb was not set")
if __name__ == '__main__':
rclpy.init()
node = SmartMsgSubscriber(node_name='smart_msg_subscriber')
rclpy.spin(node)
rclpy.shutdown()
face_tracking.py 代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'ning'
"""
@作者: ning
@说明: 人脸跟踪程序;
"""
import rclpy
import threading
import queue
from simple_pid import PID
from geometry_msgs.msg import Twist
from ai_msgs.msg import PerceptionTargets
from .common import *
from .time_helper import TimeHelper
from .robot_ctrl_node import RobotCmdVelNode
class TrackingManager:
__instance = None # 类变量,用于存储单例实例
@classmethod
def Instance(cls):
if not cls.__instance:
cls.__instance = cls()
return cls.__instance
# 释放资源
def Release(self):
print("TrackingManager release")
self.start_ = False
if self.smart_process_task_ and self.smart_process_task_.is_alive():
self.smart_process_task_.join()
self.smart_process_task_ = None
self.param_node_ = None
self.robot_cmdvel_node_ = None
def __init__(self):
# 跟踪器是否已启动?
self.start_ = True
# 实例化PID控制器
self.pid = PID(5.0, 0.0, 0.0, setpoint=0)
# 设置采样时间
self.pid.sample_time = 0.1
# 设置输出限制
self.pid.output_limits = (-10, 10)
# 设置pid死区
self.offset_dead_block = 0.1
# 实例化 common.py::TrackCfg, 存储跟踪配置信息
self.track_cfg_ = TrackCfg()
# TODO(ning 2023.03.12): 先解析数据, 最后再来控制小车;
self.robot_cmdvel_node_ = RobotCmdVelNode("ob_tracking_RobotCmdVel")
# 线程, 用于从 queue 中获取帧消息;
self.smart_process_task_ = None
self.queue_len_limit_ = 2
self.smart_queue_ = queue.Queue(maxsize=self.queue_len_limit_)
if not self.smart_process_task_:
self.smart_process_task_ = threading.Thread(
target=self.smart_process_task)
self.smart_process_task_.start()
def smart_process_task(self):
while self.start_ and rclpy.ok():
# 直接get,
smart_frame = self.smart_queue_.get()
# 执行跟踪策略
self.RunTrackingStrategy(smart_frame)
def FeedSmart(self, msg: PerceptionTargets):
'''
接收 smart_sub 的 msg
'''
if not rclpy.ok():
return
# 如果队列已满, 队列中第一帧未处理完成, 则将第一帧出栈, 保证队列中的帧是最新的;
if self.smart_queue_.full():
self.smart_queue_.get()
# python 的队列是线程安全的
self.smart_queue_.put(msg) # 将ai msg消息放入队列中
def RunTrackingStrategy(self, msg: PerceptionTargets):
''' 运行跟踪策略, 接收外部的目标检测结果作为输入。 '''
# 开始追踪时间戳
start_tracking_ts = TimeHelper.GetCurrentTimestampMillSec()
self.ProcessSmart(msg)
# 当前时间戳
present_tracking_ts = TimeHelper.GetCurrentTimestampMillSec()
# 输出跟踪策略花费的时间;
rclpy.logging.get_logger('TrackingManager').info(
"Run TrackingStrategy time ms cost: %d" % (
present_tracking_ts - start_tracking_ts)
)
def ProcessSmart(self, msg: PerceptionTargets):
# 解析 Ai msg 数据, 找出人脸, 计算PID误差值;
err = 0.0
for target in msg.targets:
if target.rois[0].type == 'face':
rect = target.rois[0].rect
# (rect.x_offset + rect.width / 2) # 计算人脸x坐标中心点;
# ((rect.x_offset + rect.width / 2) / track_cfg_.img_width - 0.5) * 2 # 归一化处理,将人脸坐标归一到 NDC空间 [-1, 1];
err = float(((rect.x_offset + rect.width / 2) / self.track_cfg_.img_width - 0.5) * 2)
break
# 死区
if abs(err) < self.offset_dead_block:
err = 0
out_ = self.pid(err)
rclpy.logging.get_logger('TrackingManager').info("in=%.2f out=%.2f" % (err, out_))
# 发布速度话题
twist = Twist()
twist.angular.z = out_
self.robot_cmdvel_node_.robot_ctl(twist)
def GetNodes(self):
''' 取所有节点 '''
node_ptrs = []
# node_ptrs.append(self.param_node_)
node_ptrs.append(self.robot_cmdvel_node_)
return node_ptrs
def GetTrackCfg(self):
''' 取跟踪配置 '''
return self.track_cfg_
0x06 程序设计难点
难点1:Python 多线程 Queue:
Python Queue是线程安全的不同于C++,因此可以直接使用get()、put() 而不需要使用锁Lock();
使用 多线程+消息队列 订阅 Ai msg,可以避免单线程时等待 Ai msg 产生的延迟?
难点2:PID人脸追踪器:
- PID(proportion integration differentiation) 控制器只了P(proportion),因此为比例控制器,类似于一次函数;
-
PID(5.0, 0.0, 0.0, setpoint=0) # P 经过调试后被设置为 5,setpoint 目标点为 0,即 NDC 空间下图像的中点;
- pid.sample_time = 0.1 # 设置采样率可以简化 PID 的计算,推荐设置;采样率与函数的调用频率无关;
PerceptionTargets.msg 接口如下:
# 感知结果
# 消息头
std_msgs/Header header
# 感知结果的处理帧率
# fps val is invalid if fps is less than 0
int16 fps
# 性能统计信息,比如记录每个模型推理的耗时
Perf[] perfs
# 感知目标集合
Target[] targets
# 消失目标集合
Target[] disappeared_targets
Target.msg 接口如下:
# 目标消息 Targets
# 目标类型名称,如:人、车、动物、物体
# person/car/object/animal
string type
# 目标跟踪ID号
uint64 track_id
# 检测框
Roi[] rois
# 属性
Attribute[] attributes
# 关键点
Point[] points
# 跟踪目标抓拍图、特征、特征的底库检索结果信息
Capture[] captures
Roi.msg 接口如下:
# roi感知消息
# 如:人体检测框、人头检测框、人脸检测框、人手检测框
# roi类型
# body/head/face/hand
string type
# 检测框
sensor_msgs/RegionOfInterest rect
# 检测结果的置信度
float32 confidence
BUG:经解析后,笔者认为这里有一个数据结构错误,但不影响程序使用;
PerceptionTargets.msg 中有一个 Target[] targets 数组,其中这个 target 的类型可以是人、车、动物、物体,这个 target 下有一个 Roi[] rois,这个 Roi 包含人体框、人头框、人脸框、人手框多个元素;
然而在实际使用中发现:
rois[]数组中只包含一个元素,数据伪代码如下:
target1 = {
'type': 'person',
'track_id': '',
'rois': [
{
'type' : 'body',
'rect':[],
'confidence': []
}
],
'attributes': [],
'points': [],
'captures': []
}
target2 = {
'type': 'person',
'track_id': '',
'rois': [
{
'type' : 'face',
'rect':[],
'confidence': []
}
],
'attributes': [],
'points': [],
'captures': []
}
target3 = {
'type': 'person',
'track_id': '',
'rois': [
{
'type' : 'hand',
'rect':[],
'confidence': []
}
],
'attributes': [],
'points': [],
'captures': []
}
PerceptionTargets = {
'header': '',
'fps': '',
'targets': [
target1,
target2,
target3,
],
'disappeared_targets': [
]
}
我认为正确的数据结构如下:
roi1 = {
{
'type': 'body',
'rect': [],
'confidence': []
}
}
roi2 = {
{
'type': 'face',
'rect': [],
'confidence': []
}
}
roi3 = {
{
'type': 'hand',
'rect': [],
'confidence': []
}
}
target1 = {
'type': 'person',
'track_id': '',
'rois': [
roi1,
roi2,
roi3
],
'attributes': [],
'points': [],
'captures': []
}
target2 = {
'type': 'car',
'track_id': '',
'rois': [
{
'type': '车牌',
'rect': [],
'confidence': []
}
],
'attributes': [],
'points': [],
'captures': []
}
PerceptionTargets = {
'header': '',
'fps': '',
'targets': [
target1,
target2,
],
'disappeared_targets': [
]
}
正确 √
target.type == person
rois[人体,人脸,人手,人头, ...]
错误 ❌
target.type == person
rois[人体]
target.type == person
rois[人头]
target.type == person
rois[人脸]
0x06 编译运行
ssh连接小车,安装 py 功能包依赖库simple_pid:
1、使用 pip 安装 simple_pid 库:
pip3 install simple_pid
2、执行功能包根目录下的 .\install.sh 脚本自动安装依赖:
.\install.sh
cd 进 /userdata/dev_ws/ 目录,执行如下命令编译功能包;
colcon build --packages-select originbot_face_tracking
使用 Launch 文件启动人脸追踪功能:
ros2 launch originbot_face_tracking ob_face_tracking.launch.py
启动小车底盘:
ros2 launch originbot_bringup originbot.launch.py
enjoy it!
0x07 执行效果
O
评论(1)
您还未登录,请登录后发表或查看评论