文章目录


前言

  有限状态机是一款用于对象行为建模的工具,其主要作用是描述对象在生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。。
  ROS提供了有限状态机功能包SMACH来处理机器人任务中的多个状态模块。

一、什么是SMACH

  SMACH是基于Python实现的一个功能强大且易于扩展的库,不依赖于ROS,可用于任意python项目ROS中的executive_smach功能包将SMACH和ROS集成到一起,为机器人复杂应用开发提供任务级的状态机框架,同时集成了actionlib和smach_viewer,用于管理action和状态机的可视化显示。

SMACH实现的功能:

  1. 快速原型设计:基于python语法,实现状态机模型的快速开发测试
  2. 复杂状态机模型:支持设计、维护、调试大型复杂的状态机
  3. 可视化:提供可视化的smach_viewer,可以看到完整状态机的状态跳转、数据流等信息

二、安装SMACH

命令如下:

sudo apt-get install ros-kinetic-executive-smach
sudo apt-get install ros-kinetic-executive-smach-visualization

三、运行一个状态机

参考胡老师的源码,功能包名为smach_tutorials,运行一个简单的状态机,命令如下:

roscore
rosrun smach_tutorials state_machine_simple.py

如果运行失败,则先添加可执行权限,效果如下:
在这里插入图片描述
可以看出状态机在进行状态跳转,一共两个状态FOO和BAR,我们借助可视化的smach_viewer,可以很清晰的看到完整状态机的状态跳转、数据流等信息。命令如下:

rosrun smach_viewer smach_viewer.py

效果如下:
在这里插入图片描述

四、实现剖析

上述例程的实现源码在state_machine_simple.py文件中,大体上分为如下几步:

1、 定义状态FOO和BAR

(1)初始化函数:初始化状态类,定义输出状态。
(2)执行函数:每个状态中的具体工作内容,工作结束后返回定义的输出值,该状态结束。

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome1','outcome2'])
        self.counter = 0

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if self.counter < 3:
            self.counter += 1
            return 'outcome1'
        else:
            return 'outcome2'

# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome2'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        return 'outcome2'

2、创建状态机

    # 创建一个状态机
    sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])

同时指定状态机执行结束后的输出值outcome4和outcome5

3、 添加状态到状态机容器中

        # 使用add方法添加状态到状态机容器当中
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'})

transitions代表状态跳转,如果FOO状态输出为outcome1,则跳转到BAR状态;如果状态输出为outcome2,则结束这个状态机,并且输出outcome4

4、创建内部监测服务器

    # 创建并启动内部监测服务器
    sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
    sis.start()

第一个参数,表示监测服务器的名称,第二个参数,表示所要监测的状态机,第三个参数,表示状态机的层级
目的是为了状态机可视化。

5、执行状态机

    # 开始执行状态机
    outcome = sm.execute()

五、状态间的数据传递

有时候后一个状态的工作需要使用前一个状态中的数据,这时就需要状态间的数据传递。运行实例,命令如下:

roscore
rosrun smach_tutorials user_data.py
rosrun smach_viewer smach_viewer.py

效果如下:
在这里插入图片描述

1、定义状态

# 定义状态Foo
class Foo(smach.State):
    def __init__(self):
        smach.State.__init__(self, 
                             outcomes=['outcome1','outcome2'],
                             input_keys=['foo_counter_in'],
                             output_keys=['foo_counter_out'])

    def execute(self, userdata):
        rospy.loginfo('Executing state FOO')
        if userdata.foo_counter_in < 3:
            userdata.foo_counter_out = userdata.foo_counter_in + 1
            return 'outcome1'
        else:
            return 'outcome2'


# 定义状态Bar
class Bar(smach.State):
    def __init__(self):
        smach.State.__init__(self, 
                             outcomes=['outcome1'],
                             input_keys=['bar_counter_in'])
        
    def execute(self, userdata):
        rospy.loginfo('Executing state BAR')
        rospy.loginfo('Counter = %f'%userdata.bar_counter_in)        
        return 'outcome1'

可以看到,在状态的初始化中多了两个参数:input_keys和output_keys,分别表示状态的输入/输出数据。在执行函数中,也多了一个userdata参数,这就是存储状态之间所传递数据的容器。FOO状态的输入/输出数据foo_counter_in和foo_counter_out就存储在userdata中。所以要访问或修改数据,需要使用userdata.foo_counter_out和userdata.foo_counter_in的形式。

2、定义状态间需传递的数据变量

sm.userdata.sm_counter = 0

3、添加状态到状态机容器中

        # 使用add方法添加状态到状态机容器当中
        smach.StateMachine.add('FOO', Foo(), 
                               transitions={'outcome1':'BAR', 
                                            'outcome2':'outcome4'},
                               remapping={'foo_counter_in':'sm_counter', 
                                          'foo_counter_out':'sm_counter'})
        smach.StateMachine.add('BAR', Bar(), 
                               transitions={'outcome1':'FOO'},
                               remapping={'bar_counter_in':'sm_counter'})

在这一步,多了一个remapping参数,类似于ROS重映射。这里将sm_counter映射为foo_counter_in、foo_counter_out和bar_counter_in,也就是给sm_counter取了一堆别名。这样,sm_counter在FOO中累加后就会被传递到BAR中,表明状态间的数据传递完成。

六、状态机嵌套

状态机是容器,支持嵌套功能,即在状态机中还可以嵌套实现一个内部状态机。
运行实例,命令如下:

roscore
rosrun smach_tutorials state_machine_nesting.py 
rosrun smach_viewer smach_viewer.py

效果如下:
在这里插入图片描述
灰色区域,就是状态SUB内部的嵌套状态机。

1、定义状态

# 定义状态Bas
class Bas(smach.State):
    def __init__(self):
        smach.State.__init__(self, outcomes=['outcome3'])

    def execute(self, userdata):
        rospy.loginfo('Executing state BAS')
        return 'outcome3'

2、创建状态机

    # 创建一个顶层状态机
    sm_top = smach.StateMachine(outcomes=['outcome5'])
    
    # 打开状态机容器
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'SUB'})

        # 创建一个内嵌的状态机
        sm_sub = smach.StateMachine(outcomes=['outcome4'])

创建状态机sm_top,将其作为最顶层,在其中加入BAS状态,该状态在输出outcome3时会跳转到SUB状态。然后,定义一个内嵌的状态机。

3、 添加状态到状态机容器中

        # 打开状态机容器
        with sm_sub:

            # 使用add方法添加状态到状态机容器当中
            smach.StateMachine.add('FOO', Foo(), 
                                   transitions={'outcome1':'BAR', 
                                                'outcome2':'outcome4'})
            smach.StateMachine.add('BAR', Bar(), 
                                   transitions={'outcome1':'FOO'})

        smach.StateMachine.add('SUB', sm_sub,
                               transitions={'outcome4':'outcome5'})

将状态FOO和BAR添加到内嵌状态机sm_sub中,然后将sm_sub嵌套到sm_top中。

七、多状态并行

SMACH支持多个状态并列运行。
运行实例,命令如下:

roscore
rosrun smach_tutorials concurrence.py 
rosrun smach_viewer smach_viewer.py

效果如下:
在这里插入图片描述
可以看到FOO和BAR状态是并列运行的。

1、创建状态机

    # 创建一个顶层状态机
    sm_top = smach.StateMachine(outcomes=['outcome6'])
    
    # 打开状态机容器
    with sm_top:

        smach.StateMachine.add('BAS', Bas(),
                               transitions={'outcome3':'CON'})

        # 创建一个内嵌的状态机
        sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
                                   default_outcome='outcome4',
                                   outcome_map={'outcome5':
                                       { 'FOO':'outcome2',
                                         'BAR':'outcome1'}})

2、添加状态到状态机容器中

        # 打开状态机容器
        with sm_con:
            # 使用add方法添加状态到状态机容器当中
            smach.Concurrence.add('FOO', Foo())
            smach.Concurrence.add('BAR', Bar())

        smach.StateMachine.add('CON', sm_con,
                               transitions={'outcome4':'CON',
                                            'outcome5':'outcome6'})

使用Concurrence创建了一个同步状态机,default_outcome表示该状态机的默认输出是outcome4,依然会循环该状态机。
outcome_map参数,设置状态机同步运行的状态跳转,当FOO状态的输出为outcome2并且BAR状态的输出为outcome1时,状态机才会输出outcome5,从而跳转到顶层状态机中。