0x00 前情回顾
0x01 实验概述
在学习《ROS2入门21讲》,掌握ROS2的 系统架构 及 话题、通信接口、服务、动作 等 核心概念 之后,小试牛刀。
编写Python代码实现 originbot_teleop_node 节点,实现如下功能:
@说明: 通过手柄控制 OriginBot 小车, 增加按键控制功能;
- 按下 'Y' 键: 请求 /originbot_led svr, LED闪烁3s;
- 按下 'X' 键: 请求 /originbot_buzzer svr, 蜂鸣器间隔作响3s;
- 按下 'START' 键: 发布 /cmd_vel topic, 小车开始做圆周运动;
- 按下 'A' 键: 订阅 /originbot_status topic, 输出状态信息, 再次按下取消;
- 按下 'B' 键: 订阅 /odom topic, 输出里程计消息,再次按下取消;
在实现该节点时,尝试使用了 asyncio 库。
0x02 实验手柄
无线手柄,USB接收器。某宝上有很多,大概¥30一套;
0x03 手柄测试
USB接收器插入PC端,小车与PC组成局域网,实现 ROS 分布式通信。
注:如果使用的是 Ubuntu 虚拟机
- 此处PC端如果使用虚拟机运行Ubuntu系统,为确保机器人与Ubuntu系统处于同一局域网中,需要将虚拟机的网络设置为桥接模式:
- 虚拟机需要手动选择欲连接的宿主机USB设备,此处选择手柄;
在 PC 端输入如下命令安装 '手柄测试软件' :
sudo apt install jstest-gtk
执行 jstest-gtk, 手柄测试软件,并选择自己的手柄,
观察并记录,按键响应。
最后,得出如下手柄按键映射表 ( 按键 或 轴 与数组索引之间的映射 );
# 手柄按键映射表
# lr : left, right
# ud : up, down
# R2_f: froce
KEYMAP = {'axes': ['L3_lr', 'L3_ud', 'R3_lr', 'R3_ud', 'R2_f', 'L2_f', 'HALT_lr', 'HALT_ud'],
'btns': ['A', 'B', '', 'X', 'Y', '', 'L1', 'R1', 'L2', 'R2', 'SELECT', 'START', '', 'L3', 'R3']}
0x04 安装相应的ROS功能包
在 PC 端,输入如下命令安装 ROS手柄驱动功能包 'joy_linux' :
sudo apt install ros-humble-joy-linux # 安装手柄的驱动包
在 PC 端,输入如下命令安装 通用机器人摇杆控制功能包 'teleop-twist-joy' :
sudo apt install ros-humble-teleop-twist-joy # 安装 通用机器人摇杆控制功能包
teleop-twist-joy 是在 joy_linux 功能包的基础之上,通过手柄摇杆向外发布 geometry_msgs/msg/Twist 'cmd_vel' 速度消息;
在 PC 端,依次执行如下命令,观察 /joy 话题输出;
ros2 launch teleop_twist_joy teleop-launch.py joy_dev:='dev/input/js1' # 选择输入设备,启动 joy 功能包
ros2 topic echo /joy # 输出 /joy 话题
此时,按动手柄按键或摇杆,数据有响应且与jstest-gtk得出的手柄按键映射表一致;
0x05 编写手柄按键响应代码
这里我只希望手柄按键按下时响应一次;编写手柄类:
class MyController(object):
""" 类属性 """
# 手柄按键映射表
# lr : left, right
# ud : up, down
# R2_f: froce
KEYMAP = {'axes': ['L3_lr', 'L3_ud', 'R3_lr', 'R3_ud', 'R2_f', 'L2_f', 'HALT_lr', 'HALT_ud'],
'btns': ['A', 'B', '', 'X', 'Y', '', 'L1', 'R1', 'L2', 'R2', 'SELECT', 'START', '', 'L3', 'R3']}
AXES_NUM = KEYMAP['axes'].__len__() # 摇杆数量
BTNS_NUM = KEYMAP['btns'].__len__() # 按键数量
# 内部类
class Btn(object):
def __init__(self, name):
"""
初始化类的属性
"""
if name == '' or name not in MyController.KEYMAP['btns']:
raise ValueError(
"Check keyname! keyname must in 'KEYMAP' and not ''.")
self.name = name # 按键名称,必须在 KEYMAP中 中注册;
self.keymap = MyController.KEYMAP['btns'].index(name) # 按键索引
self.sta = False # 当前按键状态,False为抬起,True按下,对应 /joy 话题状态;
self.bak = self.sta # 上次按键状态, 用于检测按键按下/抬起.
# MyController
def __init__(self):
# 注册手柄 轴 及 按键, 会在 /joy 话题 callback 时遍历.
self.axes = []
self.btns = [self.Btn('X'),
self.Btn('Y'),
self.Btn('A'),
self.Btn('B'),
self.Btn('START'),]
@staticmethod
def on_btn_pressed_listener(node: Node, btn):
if node != None:
node.get_logger().info(btn.name + ' pressed!')
if btn.name == 'X':
# 蜂鸣器间隔作响3s
pass
elif btn.name == 'Y':
# 小车 LED 闪烁3s
pass
elif btn.name == 'START':
# OriginBOt机器人画圆;
pass
elif btn.name == 'A':
# 输出当前机器人状态;
pass
elif btn.name == 'B':
# 输出一次里程计信息;
pass
@staticmethod
def on_btn_released_listener(node: Node, btn):
if node != None:
node.get_logger().info(btn.name + ' released!')
if btn.name == 'X':
pass
elif btn.name == 'Y':
pass
0x06 编写ROS节点代码
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
import asyncio
class TeleopSimNode(Node):
# TeleopSimNode
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
pass # 功能实现
async def ros_loop(node):
while rclpy.ok():
rclpy.spin_once(node)
await asyncio.sleep(1e-4) # 短暂挂起
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = TeleopSimNode("node_originbot_teleop") # 创建ROS2节点对象并进行初始化
asyncio.run(ros_loop(node))
# 测试用
if __name__ == '__main__':
main()
使用asyncio协程的一次尝试,参考: m2-farzan/ros2-asyncio: How to use ROS 2 (rclpy) with Python async (github.com)
rclpy 内部似乎也运行了一个event_loop,需要不停的调用rclpy.spin()检查事件回调,不要在call_back中执行耗时操作或使用同步call,否则会产生死锁。
0x06 订阅/joy话题
在 TeleopSimNode 节点中编写了一个,__JoySub的内部类,并在 TeleopSimNode.__init__ 中初始化;
在 /joy 话题的回调函数中,遍历手柄按钮,当按钮改变时,回调其监听函数;
看一看代码,并思考一下。此时,是否已经实现了按键单击的代码框架了呢?
from sensor_msgs.msg import Joy
class TeleopSimNode(Node):
# 内部类 /joy话题订阅者
class __JoySub():
def __init__(self, node):
self.node = node
self.ctrl = MyController()
self.joy_sub = self.node.create_subscription(
Joy, "/joy", self.joy_callback, 10)
def joy_callback(self, joy_msg):
for btn in self.ctrl.btns:
btn.sta = joy_msg.buttons[btn.keymap]
if btn.sta != btn.bak: # 按键状态改变
if btn.sta:
self.ctrl.on_btn_pressed_listener(
self.node, btn) # 按键按下
else:
self.ctrl.on_btn_released_listener(
self.node, btn) # 按键弹起
btn.bak = btn.sta
def __del__(self):
self.node.destroy_subscription(self.joy_sub)
# TeleopSimNode
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.joy_sub = self.__JoySub(self) # 实例化 joy 话题订阅者
0x06 请求小车 蜂鸣器 & LED 服务
编译OriginBot机器人通信接口;
将小车源码中 /originbot/originbot_msgs 功能包,拷贝到PC端 dev_ws工作空间,随后colcon build编译;
from originbot_msgs.srv import OriginbotBuzzer, OriginbotLed # 小车的服务接口
使用 async / await 处理 服务 请求及回调;
class TeleopSimNode(Node):
# 内部 服务请求客户端类
class __SvrClient():
def __init__(self, node):
self.node = node
self.buzzer_cl = node.create_client(
OriginbotBuzzer, '/originbot_buzzer')
self.led_cl = node.create_client(OriginbotLed, 'originbot_led')
async def req_buzzer(self, buzzer_on):
# 循环等待服务器端成功启动
while not self.buzzer_cl.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
# BUG : 如果小车底盘程序未启动会进入死循环;
req = OriginbotBuzzer.Request()
req.on = buzzer_on
try:
response = await self.buzzer_cl.call_async(req)
except Exception as e:
self.node.get_logger().info(
'/originbot_buzzer Service call failed %r' % (e,))
else:
self.node.get_logger().info('/originbot_buzzer Service call Success!')
async def req_led(self, led_on):
# 循环等待服务器端成功启动
while not self.led_cl.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
# BUG : 如果小车底盘程序未启动会进入死循环;
req = OriginbotLed.Request()
req.on = led_on
try:
response = await self.led_cl.call_async(req)
except Exception as e:
self.node.get_logger().info(
'/originbot_led Service call failed %r' % (e,))
else:
self.node.get_logger().info('/originbot_led Service call Success!')
# TeleopSimNode
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.joy_sub = self.__JoySub(self) # 实例化 joy 话题订阅者
self.cl = self.__SvrClient(self) # 实例化 服务请求客户端
在 MyController.on_btn_pressed_listener 中添加逻辑代码:
class MyController(object):
...
@staticmethod
def on_btn_pressed_listener(node: Node, btn):
if node != None:
node.get_logger().info(btn.name + ' pressed!')
if btn.name == 'X':
# 蜂鸣器间隔作响3s
async def t_buzzer():
for _x in range(6): # 3*2=6 3个周期,一个周期两个状态
asyncio.create_task(node.cl.req_buzzer(
True if _x % 2 == 0 else False))
await asyncio.sleep(1)
asyncio.create_task(t_buzzer())
elif btn.name == 'Y':
# 小车 LED 闪烁3s
async def t_led():
for _x in range(6):
asyncio.create_task(node.cl.req_led(
True if _x % 2 == 0 else False))
await asyncio.sleep(0.5)
asyncio.create_task(t_led())
...
实现效果:
0x07 发布Twist话题使小车做圆周运动
当按下'START'按键时小车开始做圆周运动,再次按下时停止;
from geometry_msgs.msg import Twist # 速度话题的消息
class BtnHandler():
__draw = False # 画圆 标志位
@classmethod
async def draw_circle(cls, node: Node):
cls.__draw = not cls.__draw
while (cls.__draw):
twist = Twist()
twist.linear.x = 0.2
twist.angular.z = 0.8
node.vel_pub.publish(twist)
node.get_logger().info('Publishing: "linear: %0.2f, angular: %0.2f"' %
(twist.linear.x, twist.angular.z))
await asyncio.sleep(0.5)
else:
twist = Twist()
twist.linear.x = 0.0
twist.angular.z = 0.0
node.vel_pub.publish(twist)
在 MyController.on_btn_pressed_listener 中添加逻辑代码:
if btn.name == 'A':
...
elif btn.name == 'START':
# OriginBOt机器人画圆;
asyncio.create_task(BtnHandler.draw_circle(node))
...
实现效果:
0x08 订阅 里程计odom & 小车状态status 话题
from originbot_msgs.msg import OriginbotStatus
from nav_msgs.msg import Odometry # 里程计消息
- 只显示一次的写法:
class BtnHandler():
...
@classmethod
async def show_status(cls, node: Node):
async def listener_callback(msg):
node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on: "%d"'
% (msg.battery_voltage, msg.buzzer_on, msg.led_on))
node.destroy_subscription(originbot_status_sub) # 只显示一次
originbot_status_sub = node.create_subscription(
OriginbotStatus, "originbot_status", listener_callback, 10)
@classmethod
async def show_odom(cls, node: Node):
async def odom_cb(msg):
node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
% (msg.pose.pose.position.x, msg.pose.pose.position.y))
node.destroy_subscription(odom_sub)
# 只显示一次里程计信息;
odom_sub = node.create_subscription(
Odometry, "odom", odom_cb, 10)
在 MyController.on_btn_pressed_listener 中添加逻辑代码:
...
elif btn.name == 'B':
asyncio.create_task(BtnHandler.show_odom(node)) # 输出一次里程计信息;
...
- 重复显示的写法
class TeleopSimNode(Node):
...
# 内部类, 蜂鸣器 发布者
class __Sub():
def __init__(self, node: Node) -> None:
self.node = node
# 小车状态
self.originbot_status_show = False
self.originbot_status_sub = node.create_subscription(
OriginbotStatus, "originbot_status", self.originbot_status_cb, 10)
# 小车 里程计
self.odom_show = False
self.odom_sub = node.create_subscription(
Odometry, "odom", self.odom_cb, 10)
def odom_cb(self, msg):
if self.odom_show:
self.node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
% (msg.pose.pose.position.x, msg.pose.pose.position.y))
def originbot_status_cb(self, msg):
if self.originbot_status_show:
self.node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on: "%d"'
% (msg.battery_voltage, msg.buzzer_on, msg.led_on))
...
实现效果:
0x09 编写Launch文件
配置teleop_twist_joy节点参数,
teleop_twist_joy_node:
ros__parameters:
axis_linear: # 线速度
x: 1
scale_linear: # 与 axis_linear 相乘
x: 0.7
scale_linear_turbo: # 与 axis_linear 相乘, 涡轮增压
x: 2.0
axis_angular: # 角速度
yaw: 0
scale_angular: # 与 axis_angular 相乘
yaw: 1.0
enable_button: 8 # L2 shoulder button, 配置常速使能按钮
enable_turbo_button: 9 # R2, 配置涡轮增压使能按钮
import os
import launch
import launch_ros.actions
from launch import LaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.conditions import IfCondition
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
# 找到配置文件的完整路径
joy_config = os.path.join(
get_package_share_directory('learning_launch'),
'config', 'my_ctrl.config.yaml'
)
# BUG 数据类型错误,导致参数文件无法加载;
return LaunchDescription([
launch.actions.DeclareLaunchArgument(name='use_turtulesim', default_value='false'),
# 添加 小乌龟 节点
launch_ros.actions.Node(
package='turtlesim',
executable='turtlesim_node',
output='screen',
condition=IfCondition(launch.substitutions.LaunchConfiguration('use_turtulesim')),
remappings=[("/turtle1/cmd_vel", "/cmd_vel")]
),
# 添加 originbot_teleop_node 节点
launch_ros.actions.Node(
package='learning_node',
executable='node_orginbot_teleop',
output='screen',
),
# 添加 遥控 启动文件
launch.actions.IncludeLaunchDescription(
PythonLaunchDescriptionSource(
[
os.path.join(
get_package_share_directory('teleop_twist_joy'),
'launch', 'teleop-launch.py'
)
]
),
launch_arguments={'joy_dev': 'dev/input/js1',
'config_filepath': joy_config}.items()
)
])
在 launch_arguments 中设置手柄输入设备 'joy_dev'。
0x0A 完整代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@作者: ning
@说明: 通过手柄控制 OriginBot 小车, 增加按键控制功能;
按下 'Y' 键: 请求 /originbot_led svr, LED闪烁3s;
按下 'X' 键: 请求 /originbot_buzzer svr, 蜂鸣器间隔作响3s;
按下 'START' 键: 发布 /cmd_vel topic, 小车开始做圆周运动;
按下 'A' 键: 订阅 /originbot_status topic, 输出状态信息, 再次按下取消;
按下 'B' 键: 订阅 /odom topic, 输出里程计消息,再次按下取消;
"""
import rclpy # ROS2 Python接口库
from rclpy.node import Node # ROS2 节点类
import asyncio
from sensor_msgs.msg import Joy
from turtlesim.msg import Pose # 测试用
from originbot_msgs.srv import OriginbotBuzzer, OriginbotLed # 自定义的服务接口
from originbot_msgs.msg import OriginbotStatus
from geometry_msgs.msg import Twist # 速度话题的消息
from nav_msgs.msg import Odometry # 里程计消息
class BtnHandler():
__draw = False # 画圆 标志位
@classmethod
async def draw_circle(cls, node: Node):
cls.__draw = not cls.__draw
while (cls.__draw):
twist = Twist()
twist.linear.x = 0.2
twist.angular.z = 0.8
node.vel_pub.publish(twist)
node.get_logger().info('Publishing: "linear: %0.2f, angular: %0.2f"' %
(twist.linear.x, twist.angular.z))
await asyncio.sleep(0.5)
else:
twist = Twist()
twist.linear.x = 0.0
twist.angular.z = 0.0
node.vel_pub.publish(twist)
@classmethod
async def show_status(cls, node: Node):
async def listener_callback(msg):
node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on: "%d"'
% (msg.battery_voltage, msg.buzzer_on, msg.led_on))
node.destroy_subscription(originbot_status_sub) # 只显示一次
originbot_status_sub = node.create_subscription(
OriginbotStatus, "originbot_status", listener_callback, 10)
@classmethod
async def show_pose(cls, node: Node):
# 只显示一次小乌龟 pose 状态
async def listener_callback(msg):
node.get_logger().info('x: "%0.2f", y: "%0.2f", theta: "%0.2f"'
% (msg.x, msg.y, msg.theta))
node.destroy_subscription(turtle1_pose_sub)
turtle1_pose_sub = node.create_subscription(
Pose, "/turtle1/pose", listener_callback, 10)
@classmethod
async def show_odom(cls, node: Node):
async def odom_cb(msg):
node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
% (msg.pose.pose.position.x, msg.pose.pose.position.y))
node.destroy_subscription(odom_sub)
# 只显示一次里程计信息;
odom_sub = node.create_subscription(
Odometry, "odom", odom_cb, 10)
class MyController(object):
""" 类属性 """
# 手柄按键映射表
# lr : left, right
# ud : up, down
# R2_f: froce
KEYMAP = {'axes': ['L3_lr', 'L3_ud', 'R3_lr', 'R3_ud', 'R2_f', 'L2_f', 'HALT_lr', 'HALT_ud'],
'btns': ['A', 'B', '', 'X', 'Y', '', 'L1', 'R1', 'L2', 'R2', 'SELECT', 'START', '', 'L3', 'R3']}
AXES_NUM = KEYMAP['axes'].__len__() # 摇杆数量
BTNS_NUM = KEYMAP['btns'].__len__() # 按键数量
# 内部类
class Btn(object):
def __init__(self, name):
"""
初始化类的属性
"""
if name == '' or name not in MyController.KEYMAP['btns']:
raise ValueError(
"Check keyname! keyname must in 'KEYMAP' and not ''.")
self.name = name # 按键名称,必须在 KEYMAP中 中注册;
self.keymap = MyController.KEYMAP['btns'].index(name) # 按键索引
self.sta = False # 当前按键状态,False为抬起,True按下,对应 /joy 话题状态;
self.bak = self.sta # 上次按键状态, 用于检测按键按下/抬起.
# MyController
def __init__(self):
# 注册手柄 轴 及 按键, 会在 /joy 话题 callback 时遍历.
self.axes = []
self.btns = [self.Btn('X'),
self.Btn('Y'),
self.Btn('A'),
self.Btn('B'),
self.Btn('START'),]
@staticmethod
def on_btn_pressed_listener(node: Node, btn):
if node != None:
node.get_logger().info(btn.name + ' pressed!')
if btn.name == 'X':
# 蜂鸣器间隔作响3s
async def t_buzzer():
for _x in range(6): # 3*2=6 3个周期,一个周期两个状态
asyncio.create_task(node.cl.req_buzzer(
True if _x % 2 == 0 else False))
await asyncio.sleep(1)
asyncio.create_task(t_buzzer())
elif btn.name == 'Y':
# 小车 LED 闪烁3s
async def t_led():
for _x in range(6):
asyncio.create_task(node.cl.req_led(
True if _x % 2 == 0 else False))
await asyncio.sleep(0.5)
asyncio.create_task(t_led())
elif btn.name == 'START':
asyncio.create_task(BtnHandler.draw_circle(node)) # OriginBOt机器人画圆;
elif btn.name == 'A':
node.sub.originbot_status_show = not node.sub.originbot_status_show # 输出当前机器人状态;
elif btn.name == 'B':
asyncio.create_task(BtnHandler.show_odom(node)) # 输出一次里程计信息;
@staticmethod
def on_btn_released_listener(node: Node, btn):
if node != None:
node.get_logger().info(btn.name + ' released!')
if btn.name == 'X':
pass
elif btn.name == 'Y':
pass
class TeleopSimNode(Node):
# 内部 /joy话题订阅者
class __JoySub():
def __init__(self, node):
self.node = node
self.ctrl = MyController()
self.joy_sub = self.node.create_subscription(
Joy, "/joy", self.joy_callback, 10)
def joy_callback(self, joy_msg):
for btn in self.ctrl.btns:
btn.sta = joy_msg.buttons[btn.keymap]
if btn.sta != btn.bak: # 按键状态改变
if btn.sta:
self.ctrl.on_btn_pressed_listener(
self.node, btn) # 按键按下
else:
self.ctrl.on_btn_released_listener(
self.node, btn) # 按键弹起
btn.bak = btn.sta
def __del__(self):
self.node.destroy_subscription(self.joy_sub)
# 内部类, 蜂鸣器 发布者
class __Sub():
def __init__(self, node: Node) -> None:
self.node = node
# 小车状态
self.originbot_status_show = False
self.originbot_status_sub = node.create_subscription(
OriginbotStatus, "originbot_status", self.originbot_status_cb, 10)
# 小乌龟pose测试用
self.turtle_pose_show = False
self.turtle_pose_sub = node.create_subscription(
Pose, "/turtle1/pose", self.turtle_pose_cb, 10)
# 小车 里程计
self.odom_show = False
self.odom_sub = node.create_subscription(
Odometry, "odom", self.odom_cb, 10)
def odom_cb(self, msg):
if self.odom_show:
self.node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
% (msg.pose.pose.position.x, msg.pose.pose.position.y))
def turtle_pose_cb(self, msg):
if self.turtle_pose_show:
self.node.get_logger().info('x: "%0.2f", y: "%0.2f", theta: "%0.2f"'
% (msg.x, msg.y, msg.theta))
def originbot_status_cb(self, msg):
if self.originbot_status_show:
self.node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on: "%d"'
% (msg.battery_voltage, msg.buzzer_on, msg.led_on))
# 内部 服务请求客户端类
class __SvrClient():
def __init__(self, node):
self.node = node
self.buzzer_cl = node.create_client(
OriginbotBuzzer, '/originbot_buzzer')
self.led_cl = node.create_client(OriginbotLed, 'originbot_led')
async def req_buzzer(self, buzzer_on):
# 循环等待服务器端成功启动
while not self.buzzer_cl.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
# BUG : 如果小车底盘程序未启动会进入死循环;
req = OriginbotBuzzer.Request()
req.on = buzzer_on
try:
response = await self.buzzer_cl.call_async(req)
except Exception as e:
self.node.get_logger().info(
'/originbot_buzzer Service call failed %r' % (e,))
else:
self.node.get_logger().info('/originbot_buzzer Service call Success!')
async def req_led(self, led_on):
# 循环等待服务器端成功启动
while not self.led_cl.wait_for_service(timeout_sec=1.0):
self.node.get_logger().info('service not available, waiting again...')
# BUG : 如果小车底盘程序未启动会进入死循环;
req = OriginbotLed.Request()
req.on = led_on
try:
response = await self.led_cl.call_async(req)
except Exception as e:
self.node.get_logger().info(
'/originbot_led Service call failed %r' % (e,))
else:
self.node.get_logger().info('/originbot_led Service call Success!')
# TeleopSimNode
def __init__(self, name):
super().__init__(name) # ROS2节点父类初始化
self.joy_sub = self.__JoySub(self)
self.cl = self.__SvrClient(self)
self.sub = self.__Sub(self)
self.vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)
async def ros_loop(node):
while rclpy.ok():
rclpy.spin_once(node)
await asyncio.sleep(1e-4) # 短暂挂起
node.destroy_node() # 销毁节点对象
rclpy.shutdown() # 关闭ROS2 Python接口
def main(args=None): # ROS2节点主入口main函数
rclpy.init(args=args) # ROS2 Python接口初始化
node = TeleopSimNode("node_originbot_teleop") # 创建ROS2节点对象并进行初始化
asyncio.run(ros_loop(node))
# 测试用
if __name__ == '__main__':
main()
评论(1)
您还未登录,请登录后发表或查看评论