0x00 前情回顾

OriginBot我的第一辆智能小车

使用(PS2)手柄控制OriginBot小车

0x01 实验概述

在学习《ROS2入门21讲》,掌握ROS2的 系统架构 话题、通信接口、服务、动作 核心概念 之后,小试牛刀。

编写Python代码实现 originbot_teleop_node 节点,实现如下功能:

@说明: 通过手柄控制 OriginBot 小车, 增加按键控制功能;  

  • 按下 'Y'     键:        请求    /originbot_led         svr,       LED闪烁3s;    
  • 按下 'X'     键:        请求    /originbot_buzzer   svr,       蜂鸣器间隔作响3s;    
  • 按下 'START' 键:   发布    /cmd_vel                topic,    小车开始做圆周运动;     
  • 按下 'A'     键:        订阅    /originbot_status    topic,    输出状态信息,  再次按下取消;    
  • 按下 'B'     键:        订阅    /odom                     topic,   输出里程计消息,再次按下取消;

在实现该节点时,尝试使用了 asyncio 库。

0x02 实验手柄

无线手柄,USB接收器。某宝上有很多,大概¥30一套;

0x03 手柄测试

USB接收器插入PC端,小车与PC组成局域网,实现 ROS 分布式通信。

注:如果使用的是 Ubuntu 虚拟机

  • 此处PC端如果使用虚拟机运行Ubuntu系统,为确保机器人与Ubuntu系统处于同一局域网中,需要将虚拟机的网络设置为桥接模式

           

  • 虚拟机需要手动选择欲连接的宿主机USB设备,此处选择手柄;

在 PC 端输入如下命令安装 '手柄测试软件' :

sudo apt install jstest-gtk

执行 jstest-gtk, 手柄测试软件,并选择自己的手柄,

观察并记录,按键响应。

最后,得出如下手柄按键映射表 ( 按键轴 与数组索引之间的映射 );

# 手柄按键映射表
# lr : left, right
# ud : up, down
# R2_f: froce
KEYMAP = {'axes': ['L3_lr', 'L3_ud', 'R3_lr', 'R3_ud', 'R2_f', 'L2_f', 'HALT_lr', 'HALT_ud'],
              'btns': ['A', 'B', '', 'X', 'Y', '', 'L1', 'R1', 'L2', 'R2', 'SELECT', 'START', '', 'L3', 'R3']}

0x04 安装相应的ROS功能包

在 PC 端,输入如下命令安装 ROS手柄驱动功能包 'joy_linux' :

sudo apt install ros-humble-joy-linux 		  # 安装手柄的驱动包

在 PC 端,输入如下命令安装 通用机器人摇杆控制功能包 'teleop-twist-joy' :

sudo apt install ros-humble-teleop-twist-joy  # 安装 通用机器人摇杆控制功能包

teleop-twist-joy 是在 joy_linux 功能包的基础之上,通过手柄摇杆向外发布 geometry_msgs/msg/Twist 'cmd_vel' 速度消息;

在 PC 端,依次执行如下命令,观察 /joy 话题输出;

ros2 launch teleop_twist_joy teleop-launch.py joy_dev:='dev/input/js1'  # 选择输入设备,启动 joy 功能包
ros2 topic echo /joy    # 输出 /joy 话题 

此时,按动手柄按键或摇杆,数据有响应且与jstest-gtk得出的手柄按键映射表一致;

0x05 编写手柄按键响应代码

这里我只希望手柄按键按下时响应一次;编写手柄类:

class MyController(object):
    """ 类属性 """
    # 手柄按键映射表
    # lr : left, right
    # ud : up, down
    # R2_f: froce
    KEYMAP = {'axes': ['L3_lr', 'L3_ud', 'R3_lr', 'R3_ud', 'R2_f', 'L2_f', 'HALT_lr', 'HALT_ud'],
              'btns': ['A', 'B', '', 'X', 'Y', '', 'L1', 'R1', 'L2', 'R2', 'SELECT', 'START', '', 'L3', 'R3']}

    AXES_NUM = KEYMAP['axes'].__len__()  # 摇杆数量
    BTNS_NUM = KEYMAP['btns'].__len__()  # 按键数量

    # 内部类
    class Btn(object):
        def __init__(self, name):
            """
            初始化类的属性
            """
            if name == '' or name not in MyController.KEYMAP['btns']:
                raise ValueError(
                    "Check keyname! keyname must in 'KEYMAP' and not ''.")

            self.name = name     # 按键名称,必须在 KEYMAP中 中注册;
            self.keymap = MyController.KEYMAP['btns'].index(name)    # 按键索引
            self.sta = False     # 当前按键状态,False为抬起,True按下,对应 /joy 话题状态;
            self.bak = self.sta  # 上次按键状态, 用于检测按键按下/抬起.

    # MyController
    def __init__(self):
        # 注册手柄 轴 及 按键, 会在 /joy 话题 callback 时遍历.
        self.axes = []
        self.btns = [self.Btn('X'),
                     self.Btn('Y'),
                     self.Btn('A'),
                     self.Btn('B'),
                     self.Btn('START'),]

    @staticmethod
    def on_btn_pressed_listener(node: Node, btn):
        if node != None:
            node.get_logger().info(btn.name + ' pressed!')

        if btn.name == 'X':
            # 蜂鸣器间隔作响3s
            pass

        elif btn.name == 'Y':
            # 小车 LED 闪烁3s
            pass

        elif btn.name == 'START':
           # OriginBOt机器人画圆;
           pass

        elif btn.name == 'A':
           # 输出当前机器人状态;
           pass

        elif btn.name == 'B':
          # 输出一次里程计信息;
          pass

    @staticmethod
    def on_btn_released_listener(node: Node, btn):
        if node != None:
            node.get_logger().info(btn.name + ' released!')

        if btn.name == 'X':
            pass
        elif btn.name == 'Y':
            pass

0x06 编写ROS节点代码

import rclpy                                     # ROS2 Python接口库
from rclpy.node import Node                      # ROS2 节点类

import asyncio
class TeleopSimNode(Node):
    # TeleopSimNode
    def __init__(self, name):
        super().__init__(name)    # ROS2节点父类初始化
        pass                      # 功能实现

async def ros_loop(node):
    while rclpy.ok():
        rclpy.spin_once(node)
        await asyncio.sleep(1e-4)   # 短暂挂起

    node.destroy_node()             # 销毁节点对象
    rclpy.shutdown()                # 关闭ROS2 Python接口


def main(args=None):                                   # ROS2节点主入口main函数
    rclpy.init(args=args)                              # ROS2 Python接口初始化
    node = TeleopSimNode("node_originbot_teleop")      # 创建ROS2节点对象并进行初始化
    asyncio.run(ros_loop(node))


# 测试用
if __name__ == '__main__':
    main()

使用asyncio协程的一次尝试,参考: m2-farzan/ros2-asyncio: How to use ROS 2 (rclpy) with Python async (github.com)

rclpy 内部似乎也运行了一个event_loop,需要不停的调用rclpy.spin()检查事件回调,不要在call_back中执行耗时操作或使用同步call,否则会产生死锁。

0x06 订阅/joy话题

在 TeleopSimNode 节点中编写了一个,__JoySub的内部类,并在 TeleopSimNode.__init__ 中初始化;

在 /joy 话题的回调函数中,遍历手柄按钮,当按钮改变时,回调其监听函数;

看一看代码,并思考一下。此时,是否已经实现了按键单击的代码框架了呢?

from sensor_msgs.msg import Joy
class TeleopSimNode(Node):

    # 内部类 /joy话题订阅者
    class __JoySub():
        def __init__(self, node):
            self.node = node
            self.ctrl = MyController()
            self.joy_sub = self.node.create_subscription(
                Joy, "/joy", self.joy_callback, 10)

        def joy_callback(self, joy_msg):
            for btn in self.ctrl.btns:
                btn.sta = joy_msg.buttons[btn.keymap]
                if btn.sta != btn.bak:  # 按键状态改变
                    if btn.sta:
                        self.ctrl.on_btn_pressed_listener(
                            self.node, btn)  # 按键按下
                    else:
                        self.ctrl.on_btn_released_listener(
                            self.node, btn)  # 按键弹起
                btn.bak = btn.sta

        def __del__(self):
            self.node.destroy_subscription(self.joy_sub)

    # TeleopSimNode
    def __init__(self, name):
        super().__init__(name)                # ROS2节点父类初始化

        self.joy_sub = self.__JoySub(self)    # 实例化 joy 话题订阅者

0x06 请求小车 蜂鸣器 & LED 服务

编译OriginBot机器人通信接口;

将小车源码中 /originbot/originbot_msgs 功能包,拷贝到PC端 dev_ws工作空间,随后colcon build编译;

from originbot_msgs.srv import OriginbotBuzzer, OriginbotLed  # 小车的服务接口

使用 async / await 处理 服务 请求及回调; 

class TeleopSimNode(Node): 
    # 内部 服务请求客户端类
    class __SvrClient():
        def __init__(self, node):
            self.node = node

            self.buzzer_cl = node.create_client(
                OriginbotBuzzer, '/originbot_buzzer')

            self.led_cl = node.create_client(OriginbotLed, 'originbot_led')

        async def req_buzzer(self, buzzer_on):
            # 循环等待服务器端成功启动
            while not self.buzzer_cl.wait_for_service(timeout_sec=1.0):
                self.node.get_logger().info('service not available, waiting again...')
                # BUG : 如果小车底盘程序未启动会进入死循环;

            req = OriginbotBuzzer.Request()
            req.on = buzzer_on

            try:
                response = await self.buzzer_cl.call_async(req)
            except Exception as e:
                self.node.get_logger().info(
                    '/originbot_buzzer Service call failed %r' % (e,))
            else:
                self.node.get_logger().info('/originbot_buzzer Service call Success!')

        async def req_led(self, led_on):
            # 循环等待服务器端成功启动
            while not self.led_cl.wait_for_service(timeout_sec=1.0):
                self.node.get_logger().info('service not available, waiting again...')
                # BUG : 如果小车底盘程序未启动会进入死循环;

            req = OriginbotLed.Request()
            req.on = led_on

            try:
                response = await self.led_cl.call_async(req)
            except Exception as e:
                self.node.get_logger().info(
                    '/originbot_led Service call failed %r' % (e,))
            else:
                self.node.get_logger().info('/originbot_led Service call Success!')


    # TeleopSimNode
    def __init__(self, name):
        super().__init__(name)                # ROS2节点父类初始化

        self.joy_sub = self.__JoySub(self)    # 实例化 joy 话题订阅者
        self.cl = self.__SvrClient(self)      # 实例化 服务请求客户端

在 MyController.on_btn_pressed_listener 中添加逻辑代码:

class MyController(object):
...
    @staticmethod
    def on_btn_pressed_listener(node: Node, btn):
        if node != None:
            node.get_logger().info(btn.name + ' pressed!')

        if btn.name == 'X':
            # 蜂鸣器间隔作响3s
            async def t_buzzer():
                for _x in range(6):  # 3*2=6 3个周期,一个周期两个状态
                    asyncio.create_task(node.cl.req_buzzer(
                        True if _x % 2 == 0 else False))
                    await asyncio.sleep(1)
            asyncio.create_task(t_buzzer())

        elif btn.name == 'Y':
            # 小车 LED 闪烁3s
            async def t_led():
                for _x in range(6):
                    asyncio.create_task(node.cl.req_led(
                        True if _x % 2 == 0 else False))
                    await asyncio.sleep(0.5)

            asyncio.create_task(t_led())
...

实现效果:

0x07 发布Twist话题使小车做圆周运动

当按下'START'按键时小车开始做圆周运动,再次按下时停止;

from geometry_msgs.msg import Twist       # 速度话题的消息
class BtnHandler():
    __draw = False  # 画圆 标志位

    @classmethod
    async def draw_circle(cls, node: Node):
        cls.__draw = not cls.__draw

        while (cls.__draw):
            twist = Twist()
            twist.linear.x = 0.2
            twist.angular.z = 0.8
            node.vel_pub.publish(twist)
            node.get_logger().info('Publishing: "linear: %0.2f, angular: %0.2f"' %
                                   (twist.linear.x, twist.angular.z))
            await asyncio.sleep(0.5)
        else:
            twist = Twist()
            twist.linear.x = 0.0
            twist.angular.z = 0.0
            node.vel_pub.publish(twist)

在 MyController.on_btn_pressed_listener 中添加逻辑代码:

    if btn.name == 'A':
         ...
    elif btn.name == 'START':
           # OriginBOt机器人画圆;
            asyncio.create_task(BtnHandler.draw_circle(node))   
    ...

实现效果:

0x08 订阅 里程计odom & 小车状态status 话题

from originbot_msgs.msg import OriginbotStatus
from nav_msgs.msg import Odometry         # 里程计消息
  • 只显示一次的写法:
class BtnHandler():
    ...

    @classmethod
    async def show_status(cls, node: Node):
        async def listener_callback(msg):
            node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on:  "%d"'
                                   % (msg.battery_voltage, msg.buzzer_on, msg.led_on))
            node.destroy_subscription(originbot_status_sub)  # 只显示一次

        originbot_status_sub = node.create_subscription(
            OriginbotStatus, "originbot_status", listener_callback, 10)


    @classmethod
    async def show_odom(cls, node: Node):
        async def odom_cb(msg):
            node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
                                   % (msg.pose.pose.position.x, msg.pose.pose.position.y))
            node.destroy_subscription(odom_sub)

        # 只显示一次里程计信息;
        odom_sub = node.create_subscription(
            Odometry, "odom", odom_cb, 10)

在 MyController.on_btn_pressed_listener 中添加逻辑代码:

    ...
    elif btn.name == 'B':
            asyncio.create_task(BtnHandler.show_odom(node))     # 输出一次里程计信息;
    ...
  • 重复显示的写法
class TeleopSimNode(Node):
   ...
    # 内部类, 蜂鸣器 发布者
    class __Sub():
        def __init__(self, node: Node) -> None:
            self.node = node

            # 小车状态
            self.originbot_status_show = False
            self.originbot_status_sub = node.create_subscription(
                OriginbotStatus, "originbot_status", self.originbot_status_cb, 10)

            # 小车 里程计
            self.odom_show = False
            self.odom_sub = node.create_subscription(
                Odometry, "odom", self.odom_cb, 10)

        def odom_cb(self, msg):
            if self.odom_show:
                self.node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
                                            % (msg.pose.pose.position.x, msg.pose.pose.position.y))

        def originbot_status_cb(self, msg):
            if self.originbot_status_show:
                self.node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on:  "%d"'
                                            % (msg.battery_voltage, msg.buzzer_on, msg.led_on))
   ...

实现效果:

0x09 编写Launch文件

配置teleop_twist_joy节点参数,

teleop_twist_joy_node:
  ros__parameters:
    axis_linear:          # 线速度
      x: 1
    scale_linear:          # 与 axis_linear 相乘
      x: 0.7
    scale_linear_turbo:    # 与 axis_linear 相乘, 涡轮增压
      x: 2.0

    axis_angular:    # 角速度
      yaw: 0
    scale_angular:   # 与 axis_angular 相乘
      yaw: 1.0

    enable_button: 8        # L2 shoulder button, 配置常速使能按钮
    enable_turbo_button: 9  # R2, 配置涡轮增压使能按钮
import os

import launch
import launch_ros.actions

from launch import LaunchDescription
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.conditions import IfCondition

from ament_index_python.packages import get_package_share_directory


def generate_launch_description():

    # 找到配置文件的完整路径
    joy_config = os.path.join(
        get_package_share_directory('learning_launch'),
        'config', 'my_ctrl.config.yaml'
    )
    # BUG 数据类型错误,导致参数文件无法加载;

    return LaunchDescription([
        launch.actions.DeclareLaunchArgument(name='use_turtulesim', default_value='false'),
        # 添加 小乌龟 节点
        launch_ros.actions.Node(
            package='turtlesim',
            executable='turtlesim_node',
            output='screen',
            condition=IfCondition(launch.substitutions.LaunchConfiguration('use_turtulesim')),
            remappings=[("/turtle1/cmd_vel", "/cmd_vel")]
        ),

        # 添加 originbot_teleop_node 节点
        launch_ros.actions.Node(
            package='learning_node',
            executable='node_orginbot_teleop',
            output='screen',
        ),

        # 添加 遥控 启动文件
        launch.actions.IncludeLaunchDescription(
            PythonLaunchDescriptionSource(
                [
                    os.path.join(
                        get_package_share_directory('teleop_twist_joy'),
                        'launch', 'teleop-launch.py'
                    )
                ]
            ),
            launch_arguments={'joy_dev': 'dev/input/js1',
                              'config_filepath': joy_config}.items()
        )
    ])

在 launch_arguments 中设置手柄输入设备 'joy_dev'。

0x0A 完整代码

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
@作者: ning
@说明: 通过手柄控制 OriginBot 小车, 增加按键控制功能;
    按下 'Y'     键: 请求 /originbot_led     svr,   LED闪烁3s;
    按下 'X'     键: 请求 /originbot_buzzer  svr,   蜂鸣器间隔作响3s;
    按下 'START' 键: 发布 /cmd_vel          topic,  小车开始做圆周运动; 
    按下 'A'     键: 订阅 /originbot_status topic,  输出状态信息,  再次按下取消;
    按下 'B'     键: 订阅 /odom             topic,  输出里程计消息,再次按下取消;
"""

import rclpy                                     # ROS2 Python接口库
from rclpy.node import Node                      # ROS2 节点类

import asyncio

from sensor_msgs.msg import Joy
from turtlesim.msg import Pose     # 测试用
from originbot_msgs.srv import OriginbotBuzzer, OriginbotLed  # 自定义的服务接口
from originbot_msgs.msg import OriginbotStatus
from geometry_msgs.msg import Twist       # 速度话题的消息
from nav_msgs.msg import Odometry         # 里程计消息


class BtnHandler():
    __draw = False  # 画圆 标志位

    @classmethod
    async def draw_circle(cls, node: Node):
        cls.__draw = not cls.__draw

        while (cls.__draw):
            twist = Twist()
            twist.linear.x = 0.2
            twist.angular.z = 0.8
            node.vel_pub.publish(twist)
            node.get_logger().info('Publishing: "linear: %0.2f, angular: %0.2f"' %
                                   (twist.linear.x, twist.angular.z))
            await asyncio.sleep(0.5)
        else:
            twist = Twist()
            twist.linear.x = 0.0
            twist.angular.z = 0.0
            node.vel_pub.publish(twist)

    @classmethod
    async def show_status(cls, node: Node):
        async def listener_callback(msg):
            node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on:  "%d"'
                                   % (msg.battery_voltage, msg.buzzer_on, msg.led_on))
            node.destroy_subscription(originbot_status_sub)  # 只显示一次

        originbot_status_sub = node.create_subscription(
            OriginbotStatus, "originbot_status", listener_callback, 10)

    @classmethod
    async def show_pose(cls, node: Node):
        # 只显示一次小乌龟 pose 状态
        async def listener_callback(msg):
            node.get_logger().info('x: "%0.2f", y: "%0.2f", theta:  "%0.2f"'
                                   % (msg.x, msg.y, msg.theta))
            node.destroy_subscription(turtle1_pose_sub)

        turtle1_pose_sub = node.create_subscription(
            Pose, "/turtle1/pose", listener_callback, 10)

    @classmethod
    async def show_odom(cls, node: Node):
        async def odom_cb(msg):
            node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
                                   % (msg.pose.pose.position.x, msg.pose.pose.position.y))
            node.destroy_subscription(odom_sub)

        # 只显示一次里程计信息;
        odom_sub = node.create_subscription(
            Odometry, "odom", odom_cb, 10)


class MyController(object):
    """ 类属性 """
    # 手柄按键映射表
    # lr : left, right
    # ud : up, down
    # R2_f: froce
    KEYMAP = {'axes': ['L3_lr', 'L3_ud', 'R3_lr', 'R3_ud', 'R2_f', 'L2_f', 'HALT_lr', 'HALT_ud'],
              'btns': ['A', 'B', '', 'X', 'Y', '', 'L1', 'R1', 'L2', 'R2', 'SELECT', 'START', '', 'L3', 'R3']}

    AXES_NUM = KEYMAP['axes'].__len__()  # 摇杆数量
    BTNS_NUM = KEYMAP['btns'].__len__()  # 按键数量

    # 内部类
    class Btn(object):
        def __init__(self, name):
            """
            初始化类的属性
            """
            if name == '' or name not in MyController.KEYMAP['btns']:
                raise ValueError(
                    "Check keyname! keyname must in 'KEYMAP' and not ''.")

            self.name = name     # 按键名称,必须在 KEYMAP中 中注册;
            self.keymap = MyController.KEYMAP['btns'].index(name)    # 按键索引
            self.sta = False     # 当前按键状态,False为抬起,True按下,对应 /joy 话题状态;
            self.bak = self.sta  # 上次按键状态, 用于检测按键按下/抬起.

    # MyController
    def __init__(self):
        # 注册手柄 轴 及 按键, 会在 /joy 话题 callback 时遍历.
        self.axes = []
        self.btns = [self.Btn('X'),
                     self.Btn('Y'),
                     self.Btn('A'),
                     self.Btn('B'),
                     self.Btn('START'),]

    @staticmethod
    def on_btn_pressed_listener(node: Node, btn):
        if node != None:
            node.get_logger().info(btn.name + ' pressed!')

        if btn.name == 'X':
            # 蜂鸣器间隔作响3s
            async def t_buzzer():
                for _x in range(6):  # 3*2=6 3个周期,一个周期两个状态
                    asyncio.create_task(node.cl.req_buzzer(
                        True if _x % 2 == 0 else False))
                    await asyncio.sleep(1)
            asyncio.create_task(t_buzzer())

        elif btn.name == 'Y':
            # 小车 LED 闪烁3s
            async def t_led():
                for _x in range(6):
                    asyncio.create_task(node.cl.req_led(
                        True if _x % 2 == 0 else False))
                    await asyncio.sleep(0.5)

            asyncio.create_task(t_led())

        elif btn.name == 'START':
            asyncio.create_task(BtnHandler.draw_circle(node))   # OriginBOt机器人画圆;

        elif btn.name == 'A':
            node.sub.originbot_status_show = not node.sub.originbot_status_show     # 输出当前机器人状态;

        elif btn.name == 'B':
            asyncio.create_task(BtnHandler.show_odom(node))     # 输出一次里程计信息;

    @staticmethod
    def on_btn_released_listener(node: Node, btn):
        if node != None:
            node.get_logger().info(btn.name + ' released!')

        if btn.name == 'X':
            pass
        elif btn.name == 'Y':
            pass


class TeleopSimNode(Node):

    # 内部 /joy话题订阅者
    class __JoySub():
        def __init__(self, node):
            self.node = node
            self.ctrl = MyController()
            self.joy_sub = self.node.create_subscription(
                Joy, "/joy", self.joy_callback, 10)

        def joy_callback(self, joy_msg):
            for btn in self.ctrl.btns:
                btn.sta = joy_msg.buttons[btn.keymap]
                if btn.sta != btn.bak:  # 按键状态改变
                    if btn.sta:
                        self.ctrl.on_btn_pressed_listener(
                            self.node, btn)  # 按键按下
                    else:
                        self.ctrl.on_btn_released_listener(
                            self.node, btn)  # 按键弹起
                btn.bak = btn.sta

        def __del__(self):
            self.node.destroy_subscription(self.joy_sub)

    # 内部类, 蜂鸣器 发布者
    class __Sub():
        def __init__(self, node: Node) -> None:
            self.node = node

            # 小车状态
            self.originbot_status_show = False
            self.originbot_status_sub = node.create_subscription(
                OriginbotStatus, "originbot_status", self.originbot_status_cb, 10)

            # 小乌龟pose测试用
            self.turtle_pose_show = False
            self.turtle_pose_sub = node.create_subscription(
                Pose, "/turtle1/pose", self.turtle_pose_cb, 10)

            # 小车 里程计
            self.odom_show = False
            self.odom_sub = node.create_subscription(
                Odometry, "odom", self.odom_cb, 10)

        def odom_cb(self, msg):
            if self.odom_show:
                self.node.get_logger().info('Robot Position: "%0.2f, %0.2f"'
                                            % (msg.pose.pose.position.x, msg.pose.pose.position.y))

        def turtle_pose_cb(self, msg):
            if self.turtle_pose_show:
                self.node.get_logger().info('x: "%0.2f", y: "%0.2f", theta:  "%0.2f"'
                                            % (msg.x, msg.y, msg.theta))

        def originbot_status_cb(self, msg):
            if self.originbot_status_show:
                self.node.get_logger().info('battery voltage: "%0.2f", buzzer on: "%d", led on:  "%d"'
                                            % (msg.battery_voltage, msg.buzzer_on, msg.led_on))

    # 内部 服务请求客户端类
    class __SvrClient():
        def __init__(self, node):
            self.node = node

            self.buzzer_cl = node.create_client(
                OriginbotBuzzer, '/originbot_buzzer')

            self.led_cl = node.create_client(OriginbotLed, 'originbot_led')

        async def req_buzzer(self, buzzer_on):
            # 循环等待服务器端成功启动
            while not self.buzzer_cl.wait_for_service(timeout_sec=1.0):
                self.node.get_logger().info('service not available, waiting again...')
                # BUG : 如果小车底盘程序未启动会进入死循环;

            req = OriginbotBuzzer.Request()
            req.on = buzzer_on

            try:
                response = await self.buzzer_cl.call_async(req)
            except Exception as e:
                self.node.get_logger().info(
                    '/originbot_buzzer Service call failed %r' % (e,))
            else:
                self.node.get_logger().info('/originbot_buzzer Service call Success!')

        async def req_led(self, led_on):
            # 循环等待服务器端成功启动
            while not self.led_cl.wait_for_service(timeout_sec=1.0):
                self.node.get_logger().info('service not available, waiting again...')
                # BUG : 如果小车底盘程序未启动会进入死循环;

            req = OriginbotLed.Request()
            req.on = led_on

            try:
                response = await self.led_cl.call_async(req)
            except Exception as e:
                self.node.get_logger().info(
                    '/originbot_led Service call failed %r' % (e,))
            else:
                self.node.get_logger().info('/originbot_led Service call Success!')

    # TeleopSimNode
    def __init__(self, name):
        super().__init__(name)                                 # ROS2节点父类初始化

        self.joy_sub = self.__JoySub(self)
        self.cl = self.__SvrClient(self)
        self.sub = self.__Sub(self)
        self.vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)


async def ros_loop(node):
    while rclpy.ok():
        rclpy.spin_once(node)
        await asyncio.sleep(1e-4)   # 短暂挂起

    node.destroy_node()             # 销毁节点对象
    rclpy.shutdown()                # 关闭ROS2 Python接口


def main(args=None):                                   # ROS2节点主入口main函数
    rclpy.init(args=args)                              # ROS2 Python接口初始化
    node = TeleopSimNode("node_originbot_teleop")      # 创建ROS2节点对象并进行初始化
    asyncio.run(ros_loop(node))


# 测试用
if __name__ == '__main__':
    main()

0x0B 参考链接