文章目录
前言
有限状态机是一款用于对象行为建模的工具,其主要作用是描述对象在生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。。
ROS提供了有限状态机功能包SMACH来处理机器人任务中的多个状态模块。
一、什么是SMACH
SMACH是基于Python实现的一个功能强大且易于扩展的库,不依赖于ROS,可用于任意python项目ROS中的executive_smach功能包将SMACH和ROS集成到一起,为机器人复杂应用开发提供任务级的状态机框架,同时集成了actionlib和smach_viewer,用于管理action和状态机的可视化显示。
SMACH实现的功能:
- 快速原型设计:基于python语法,实现状态机模型的快速开发测试
- 复杂状态机模型:支持设计、维护、调试大型复杂的状态机
- 可视化:提供可视化的smach_viewer,可以看到完整状态机的状态跳转、数据流等信息
二、安装SMACH
命令如下:
sudo apt-get install ros-kinetic-executive-smach
sudo apt-get install ros-kinetic-executive-smach-visualization
三、运行一个状态机
参考胡老师的源码,功能包名为smach_tutorials,运行一个简单的状态机,命令如下:
roscore
rosrun smach_tutorials state_machine_simple.py
如果运行失败,则先添加可执行权限,效果如下:
可以看出状态机在进行状态跳转,一共两个状态FOO和BAR,我们借助可视化的smach_viewer,可以很清晰的看到完整状态机的状态跳转、数据流等信息。命令如下:
rosrun smach_viewer smach_viewer.py
效果如下:
四、实现剖析
上述例程的实现源码在state_machine_simple.py文件中,大体上分为如下几步:
1、 定义状态FOO和BAR
(1)初始化函数:初始化状态类,定义输出状态。
(2)执行函数:每个状态中的具体工作内容,工作结束后返回定义的输出值,该状态结束。
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome1','outcome2'])
self.counter = 0
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if self.counter < 3:
self.counter += 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome2'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
return 'outcome2'
2、创建状态机
# 创建一个状态机
sm = smach.StateMachine(outcomes=['outcome4', 'outcome5'])
同时指定状态机执行结束后的输出值outcome4和outcome5
3、 添加状态到状态机容器中
# 使用add方法添加状态到状态机容器当中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'})
transitions代表状态跳转,如果FOO状态输出为outcome1,则跳转到BAR状态;如果状态输出为outcome2,则结束这个状态机,并且输出outcome4
4、创建内部监测服务器
# 创建并启动内部监测服务器
sis = smach_ros.IntrospectionServer('my_smach_introspection_server', sm, '/SM_ROOT')
sis.start()
第一个参数,表示监测服务器的名称,第二个参数,表示所要监测的状态机,第三个参数,表示状态机的层级
目的是为了状态机可视化。
5、执行状态机
# 开始执行状态机
outcome = sm.execute()
五、状态间的数据传递
有时候后一个状态的工作需要使用前一个状态中的数据,这时就需要状态间的数据传递。运行实例,命令如下:
roscore
rosrun smach_tutorials user_data.py
rosrun smach_viewer smach_viewer.py
效果如下:
1、定义状态
# 定义状态Foo
class Foo(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['outcome1','outcome2'],
input_keys=['foo_counter_in'],
output_keys=['foo_counter_out'])
def execute(self, userdata):
rospy.loginfo('Executing state FOO')
if userdata.foo_counter_in < 3:
userdata.foo_counter_out = userdata.foo_counter_in + 1
return 'outcome1'
else:
return 'outcome2'
# 定义状态Bar
class Bar(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['outcome1'],
input_keys=['bar_counter_in'])
def execute(self, userdata):
rospy.loginfo('Executing state BAR')
rospy.loginfo('Counter = %f'%userdata.bar_counter_in)
return 'outcome1'
可以看到,在状态的初始化中多了两个参数:input_keys和output_keys,分别表示状态的输入/输出数据。在执行函数中,也多了一个userdata参数,这就是存储状态之间所传递数据的容器。FOO状态的输入/输出数据foo_counter_in和foo_counter_out就存储在userdata中。所以要访问或修改数据,需要使用userdata.foo_counter_out和userdata.foo_counter_in的形式。
2、定义状态间需传递的数据变量
sm.userdata.sm_counter = 0
3、添加状态到状态机容器中
# 使用add方法添加状态到状态机容器当中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'},
remapping={'foo_counter_in':'sm_counter',
'foo_counter_out':'sm_counter'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome1':'FOO'},
remapping={'bar_counter_in':'sm_counter'})
在这一步,多了一个remapping参数,类似于ROS重映射。这里将sm_counter映射为foo_counter_in、foo_counter_out和bar_counter_in,也就是给sm_counter取了一堆别名。这样,sm_counter在FOO中累加后就会被传递到BAR中,表明状态间的数据传递完成。
六、状态机嵌套
状态机是容器,支持嵌套功能,即在状态机中还可以嵌套实现一个内部状态机。
运行实例,命令如下:
roscore
rosrun smach_tutorials state_machine_nesting.py
rosrun smach_viewer smach_viewer.py
效果如下:
灰色区域,就是状态SUB内部的嵌套状态机。
1、定义状态
# 定义状态Bas
class Bas(smach.State):
def __init__(self):
smach.State.__init__(self, outcomes=['outcome3'])
def execute(self, userdata):
rospy.loginfo('Executing state BAS')
return 'outcome3'
2、创建状态机
# 创建一个顶层状态机
sm_top = smach.StateMachine(outcomes=['outcome5'])
# 打开状态机容器
with sm_top:
smach.StateMachine.add('BAS', Bas(),
transitions={'outcome3':'SUB'})
# 创建一个内嵌的状态机
sm_sub = smach.StateMachine(outcomes=['outcome4'])
创建状态机sm_top,将其作为最顶层,在其中加入BAS状态,该状态在输出outcome3时会跳转到SUB状态。然后,定义一个内嵌的状态机。
3、 添加状态到状态机容器中
# 打开状态机容器
with sm_sub:
# 使用add方法添加状态到状态机容器当中
smach.StateMachine.add('FOO', Foo(),
transitions={'outcome1':'BAR',
'outcome2':'outcome4'})
smach.StateMachine.add('BAR', Bar(),
transitions={'outcome1':'FOO'})
smach.StateMachine.add('SUB', sm_sub,
transitions={'outcome4':'outcome5'})
将状态FOO和BAR添加到内嵌状态机sm_sub中,然后将sm_sub嵌套到sm_top中。
七、多状态并行
SMACH支持多个状态并列运行。
运行实例,命令如下:
roscore
rosrun smach_tutorials concurrence.py
rosrun smach_viewer smach_viewer.py
效果如下:
可以看到FOO和BAR状态是并列运行的。
1、创建状态机
# 创建一个顶层状态机
sm_top = smach.StateMachine(outcomes=['outcome6'])
# 打开状态机容器
with sm_top:
smach.StateMachine.add('BAS', Bas(),
transitions={'outcome3':'CON'})
# 创建一个内嵌的状态机
sm_con = smach.Concurrence(outcomes=['outcome4','outcome5'],
default_outcome='outcome4',
outcome_map={'outcome5':
{ 'FOO':'outcome2',
'BAR':'outcome1'}})
2、添加状态到状态机容器中
# 打开状态机容器
with sm_con:
# 使用add方法添加状态到状态机容器当中
smach.Concurrence.add('FOO', Foo())
smach.Concurrence.add('BAR', Bar())
smach.StateMachine.add('CON', sm_con,
transitions={'outcome4':'CON',
'outcome5':'outcome6'})
使用Concurrence创建了一个同步状态机,default_outcome表示该状态机的默认输出是outcome4,依然会循环该状态机。
outcome_map参数,设置状态机同步运行的状态跳转,当FOO状态的输出为outcome2并且BAR状态的输出为outcome1时,状态机才会输出outcome5,从而跳转到顶层状态机中。
评论(0)
您还未登录,请登录后发表或查看评论