风格:

  • 所有Python代码必需放在一个模块的命名空间
  • ROS导出源码到依赖所在的路径,因此不能跟其他人的导入冲突
  • 推荐使用模块名作为包名
    # 不带msg/srvs的情况下:
    
    packagename
     |- src/
        |- packagename.py
     |- scripts/
        |- non-exported python files
    
    
    
    # 带msg/srvs的情况下:
    packagename
     |- src/
        |- packagename/
          |- __init__.py
          |- yourfiles.py
     |- scripts/
        |- non-exported python files
    

注意:py脚本要加可执行权限
chmod +x test.py

API参考:http://docs.ros.org/api/rospy/html/

1. 发布者和订阅者

发布者:

#!/usr/bin/env python
import rospy
from std_msgs.msg import String

def talker():
	# 定义发布者名称,消息类型以及队列个数
    pub = rospy.Publisher('chatter', String, queue_size=10)

	# anonymous=True 要求每个节点都有唯一的名称,避免冲突。这样可以运行多个talker.py
    rospy.init_node('talker', anonymous=True)

	# 设置发布的频率,单位是每秒次数
    rate = rospy.Rate(10) # 10hz
    while not rospy.is_shutdown():
        hello_str = "hello world %s" % rospy.get_time()
        rospy.loginfo(hello_str) # 在屏幕输出日志信息,写入到rosout节点
        pub.publish(hello_str) # 发布信息到主题
        rate.sleep()

if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException:
		pass

订阅者:

#!/usr/bin/env python

import rospy
from std_msgs.msg import String

def callback(data):
    rospy.loginfo(rospy.get_caller_id() + 'I heard %s', data.data)

def listener():

    rospy.init_node('listener', anonymous=True)

    rospy.Subscriber('chatter', String, callback)

    rospy.spin()

if __name__ == '__main__':
    listener()

Launch文件:

<launch>
    <node name="talker" pkg="beginner_tutorials" type="talker.py" />
    <node name="listener" pkg="beginner_tutorials" type="listener.py" />
</launch>

$ rosmsg show std_msgs/ColorRGBA
float32 r
float32 g
float32 b
float32 a
#!/usr/bin/env python

import rospy
from std_msgs.msg import ColorRGBA
def talker():
    #pub = rospy.Publisher('chatter', String)
    pub = rospy.Publisher('chatter_color', ColorRGBA)
    rospy.init_node('talker_color')
    while not rospy.is_shutdown():
        pub.publish(a=1.0)
        rospy.sleep(1.0)
if __name__ == '__main__':
    try:
        talker()
    except rospy.ROSInterruptException: pass
#!/usr/bin/env python

import rospy
from std_msgs.msg import ColorRGBA
def callback(data):
    rospy.loginfo(rospy.get_name()+ "I heard r=%s g=%s b=%s a=%s", data.r, data.g, data.b, data.a)

def listener():
    rospy.init_node('listener_color', anonymous=True)
    rospy.Subscriber("chatter_color", ColorRGBA, callback)
    rospy.spin()

if __name__ == '__main__':
    listener()

2. CompressedImage类型的订阅者和发布者

发布者:

#!/usr/bin/env python

import sys, time
import numpy as np
from scipy.ndimage import filters
import cv2
import roslib
import rospy
from sensor_msgs.msg import CompressedImage

VERBOSE=False

class image_feature:

    def __init__(self):
        '''Initialize ros publisher, ros subscriber'''
        # topic where we publish
        self.image_pub = rospy.Publisher("/output/image_raw/compressed",
            CompressedImage,  queue_size = 10)
        # self.bridge = CvBridge()

        # subscribed Topic
        self.subscriber = rospy.Subscriber("/camera/image/compressed",
            CompressedImage, self.callback)
        if VERBOSE :
            print "subscribed to /camera/image/compressed"


    def callback(self, ros_data):
        '''Callback function of subscribed topic. 
        Here images get converted and features detected'''
        if VERBOSE :
            print 'received image of type: "%s"' % ros_data.format

        #### direct conversion to CV2 ####
        np_arr = np.fromstring(ros_data.data, np.uint8)
        image_np = cv2.imdecode(np_arr, cv2.CV_LOAD_IMAGE_COLOR)
        
        #### Feature detectors using CV2 #### 
        # "","Grid","Pyramid" + 
        # "FAST","GFTT","HARRIS","MSER","ORB","SIFT","STAR","SURF"
        method = "GridFAST"
        feat_det = cv2.FeatureDetector_create(method)
        time1 = time.time()

        # convert np image to grayscale
        featPoints = feat_det.detect(
            cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY))
        time2 = time.time()
        if VERBOSE :
            print '%s detector found: %s points in: %s sec.'%(method,
                len(featPoints),time2-time1)

        for featpoint in featPoints:
            x,y = featpoint.pt
            cv2.circle(image_np,(int(x),int(y)), 3, (0,0,255), -1)
        
        cv2.imshow('cv_img', image_np)
        cv2.waitKey(2)

        #### Create CompressedIamge ####
        msg = CompressedImage()
        msg.header.stamp = rospy.Time.now()
        msg.format = "jpeg"
        msg.data = np.array(cv2.imencode('.jpg', image_np)[1]).tostring()
        # Publish new image
        self.image_pub.publish(msg)
        
        #self.subscriber.unregister()

def main(args):
    '''Initializes and cleanup ros node'''
    ic = image_feature()
    rospy.init_node('image_feature', anonymous=True)
    try:
        rospy.spin()
    except KeyboardInterrupt:
        print "Shutting down ROS Image feature detector module"
    cv2.destroyAllWindows()

if __name__ == '__main__':
    main(sys.argv)

3. 服务端和客户端

服务是通过srv文件定义,它包含请求信息和响应信息.rospy将这些SRV文件转换成Python源代码并创建三个类:服务的定义,请求消息和响应消息。

非常重要!!!

my_package/srv/Foo.srv → my_package.srv.Foo
my_package/srv/Foo.srv → my_package.srv.FooRequest
my_package/srv/Foo.srv → my_package.srv.FooResponse

srv/AddTwoInts.srv内容如下:

int64 A
int64 B
---
int64 Sum

服务端:

#!/usr/bin/env python

NAME = 'add_two_ints_server'

# import the AddTwoInts service
from packageName.srv import *
import rospy 

def add_two_ints(req):
    print("Returning [%s + %s = %s]" % (req.a, req.b, (req.a + req.b)))
    sum = req.a + req.b
    return AddTwoIntsResponse(sum) 			# AddTwoIntsResponse由服务生成的返回函数

def add_two_ints_server():
    rospy.init_node(NAME)

	# 处理函数调用实例化的AddTwoIntsRequest接收请求和返回实例化的AddTwoIntsResponse
    s = rospy.Service('add_two_ints', AddTwoInts, add_two_ints)
    print "Ready to add Two Ints"
    rospy.spin()

if __name__ == "__main__":
    add_two_ints_server()

客户端:

API传送门

#!/usr/bin/env python

import sys
import os
import rospy
# imports the AddTwoInts service 
from rospy_tutorials.srv import *

def add_two_ints_client(x, y):
    # 等待接入服务节点
    rospy.wait_for_service('add_two_ints')
    
    try:
        # create a handle to the add_two_ints service
        add_two_ints = rospy.ServiceProxy('add_two_ints', AddTwoInts)   # 创建服务的处理句柄
        
        print "Requesting %s+%s"%(x, y)
        
        # 简单风格
        resp1 = add_two_ints(x, y)

        # 正式风格
        resp2 = add_two_ints.call(AddTwoIntsRequest(x, y))

        if not resp1.sum == (x + y):
            raise Exception("test failure, returned sum was %s"%resp1.sum)
        if not resp2.sum == (x + y):
            raise Exception("test failure, returned sum was %s"%resp2.sum)
        return resp1.sum
    except rospy.ServiceException, e:
        print "Service call failed: %s"%e

def usage():
    return "%s [x y]"%sys.argv[0]

if __name__ == "__main__":
    
    argv = rospy.myargv()
    if len(argv) == 1:
        import random
        x = random.randint(-50000, 50000)
        y = random.randint(-50000, 50000)
    elif len(argv) == 3:
        try:
            x = int(argv[1])
            y = int(argv[2])
        except:
            print usage()
            sys.exit(1)
    else:
        print usage()
        sys.exit(1)
    print "%s + %s = %s"%(x, y, add_two_ints_client(x, y))
<launch>
    <node name="server" pkg="beginner_tutorials" type="add_two_ints_server.py" />
</launch>

4. action服务端和客户端

Averaging.action:

#goal definition
int32 samples
---
#result definition
float32 mean
float32 std_dev
---
#feedback
int32 sample
float32 data
float32 mean
float32 std_dev

Fibonacci.action:

#goal definition
int32 order
---
#result definition
int32[] sequence
---
#feedback
int32[] sequence

服务端:

#! /usr/bin/env python

import roslib; roslib.load_manifest('packageName')
import rospy

import actionlib

import actionlib_tutorials.msg  # 导入生成的消息,action会生成用于发送目标,接受反馈的消息

class FibonacciAction(object):
  # 创建用于发布的消息 feedback/result
  _feedback = actionlib_tutorials.msg.FibonacciFeedback()
  _result   = actionlib_tutorials.msg.FibonacciResult()

  def __init__(self, name):
    self._action_name = name
    # SimpleActionServer创建服务端,需4个参数:action_name、action type、callback函数(可选)、auto_start
    self._as = actionlib.SimpleActionServer(self._action_name, actionlib_tutorials.msg.FibonacciAction, execute_cb=self.execute_cb, auto_start = False)
    self._as.start() # 启动服务器
    
  def execute_cb(self, goal):
  	# 定义回调函数,当新目标到达,rospy.loginfo 输出相关信息,可知道正在执行的action.
    r = rospy.Rate(1)
    
    # 辅助变量
    success = True
    
    # 反馈
    self._feedback.sequence = []
    self._feedback.sequence.append(0)
    self._feedback.sequence.append(1)
    
    # 为用户发布信息到控制台
    rospy.loginfo('%s: Executing, creating fibonacci sequence of order %i with seeds %i, %i' % (self._action_name, goal.order, self._feedback.sequence[0], self._feedback.sequence[1]))
    
    ######################################################################################################
    # 开始执行操作
    for i in xrange(1, goal.order):
      # 检查是否客户端没有请求抢占
      if self._as.is_preempt_requested():
        rospy.loginfo('%s: Preempted' % self._action_name)
        self._as.set_preempted() # 当客户端替换了目标,服务器端应该取消目标,执行清除,调用set_preempted。preempt(抢占)
        success = False
        break
      self._feedback.sequence.append(self._feedback.sequence[i] + self._feedback.sequence[i-1])
      
      # 发布反馈
      self._as.publish_feedback(self._feedback)
      # 这个步骤是不必要的,为了演示的目的,顺序被计算为1hz
      r.sleep()
    ######################################################################################################  
    
    if success:
      self._result.sequence = self._feedback.sequence
      rospy.loginfo('%s: Succeeded' % self._action_name)
      self._as.set_succeeded(self._result) # succeeded(成功)
      
if __name__ == '__main__':
  rospy.init_node('fibonacci')
  FibonacciAction(rospy.get_name()) # rospy.get_name()获取节点名称,实例化动作服务器的类
  rospy.spin() 

客户端:

#! /usr/bin/env python

import roslib; roslib.load_manifest('actionlib_tutorials') # 查找manifest文件,增加依赖到python路径,不能多次调用load_manifest
import rospy
import actionlib
import actionlib_tutorials.msg

def fibonacci_client():
    # 创建SimpleActionClient,传递操作的类型
    client = actionlib.SimpleActionClient('fibonacci', actionlib_tutorials.msg.FibonacciAction)

    # 等待连接上服务器
    client.wait_for_server()

    # 创建目标并发送到action服务器端
    goal = actionlib_tutorials.msg.FibonacciGoal(order=20)
    client.send_goal(goal)

    # 等待服务器完成操作
    client.wait_for_result()

    # 打印执行操作的结果
    return client.get_result()  # A FibonacciResult

if __name__ == '__main__':
    try:
        # 初始化rospy节点,以便SimpleActionClient可以通过ROS发布和订阅。
        rospy.init_node('fibonacci_client_py')
        result = fibonacci_client()
        print "Result:", ', '.join([str(n) for n in result.sequence])
    except rospy.ROSInterruptException:
        print "program interrupted before completion"

5. TF树的使用

广播变换

#!/usr/bin/env python  
import roslib
roslib.load_manifest('learning_tf')
import rospy

import tf
import turtlesim.msg

def handle_turtle_pose(msg, turtlename):
	# 处理函数针对turtle的位置信息广播平移和旋转
	# 作为变换,从"world" 坐标系到"turtleX"坐标系进行发布
    br = tf.TransformBroadcaster() # 创建广播者
    br.sendTransform((msg.x, msg.y, 0),
                     tf.transformations.quaternion_from_euler(0, 0, msg.theta),
                     rospy.Time.now(),
                     turtlename,
                     "world")

if __name__ == '__main__':
    rospy.init_node('turtle_tf_broadcaster')
    turtlename = rospy.get_param('~turtle') # 节点获取turtle参数,这个指定一个turtle名,如"turtle1" or "turtle2".
   
    # 节点订阅"turtleX/pose" 主题,指定handle_turtle_pose回调函数
    # 回调函数有参数,节点名turtlename和相关信息turtlesim.msg.Pose
    rospy.Subscriber('/%s/pose' % turtlename,
                     turtlesim.msg.Pose,
                     handle_turtle_pose,
                     turtlename)
    rospy.spin()

监听变换

#!/usr/bin/env python  
import roslib
roslib.load_manifest('learning_tf')
import rospy
import math
import tf
import geometry_msgs.msg
import turtlesim.srv

if __name__ == '__main__':
    rospy.init_node('tf_turtle')

    listener = tf.TransformListener() # 创建了一个tf.transformlistener对象.一旦侦听器被创建,它就开始接收转换,并缓冲他们长达10秒。

	# 调用服务,生成小乌龟
    rospy.wait_for_service('spawn')
    spawner = rospy.ServiceProxy('spawn', turtlesim.srv.Spawn)
    spawner(4, 2, 0, 'turtle2')

    turtle_vel = rospy.Publisher('turtle2/cmd_vel', geometry_msgs.msg.Twist,queue_size=1)

    rate = rospy.Rate(10.0)
    while not rospy.is_shutdown():
        try:
        	# 通过listener.lookupTransform函数来实现获取变换,从/turtle2变换到/turtle1,rospy.Time(0)获取最新变换
            (trans,rot) = listener.lookupTransform('/turtle2', '/turtle1', rospy.Time(0))
        except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):
            continue

        angular = 4 * math.atan2(trans[1], trans[0])
        linear = 0.5 * math.sqrt(trans[0] ** 2 + trans[1] ** 2)
        cmd = geometry_msgs.msg.Twist()
        cmd.linear.x = linear
        cmd.angular.z = angular
        turtle_vel.publish(cmd)

        rate.sleep()

增加参考系

#!/usr/bin/env python  
import roslib
roslib.load_manifest('learning_tf')

import rospy
import tf

if __name__ == '__main__':
    rospy.init_node('my_tf_broadcaster')
    br = tf.TransformBroadcaster()
    rate = rospy.Rate(10.0)
    while not rospy.is_shutdown():
        br.sendTransform((0.0, 2.0, 0.0),
                         (0.0, 0.0, 0.0, 1.0),
                         rospy.Time.now(),
                         "carrot1",
                         "turtle1")
        rate.sleep()

# 创建一个新的变换,从父”turtle1“新子”carrot1”。carrot1从turtle1偏移2米。

广播移动的参考系

#!/usr/bin/env python  
import roslib
roslib.load_manifest('learning_tf')

import rospy
import tf
import math

if __name__ == '__main__':
    rospy.init_node('my_tf_broadcaster')
    br = tf.TransformBroadcaster()
    rate = rospy.Rate(10.0)
    while not rospy.is_shutdown():
        t = rospy.Time.now().to_sec() * math.pi
        br.sendTransform((2.0 * math.sin(t), 2.0 * math.cos(t), 0.0),
                         (0.0, 0.0, 0.0, 1.0),
                         rospy.Time.now(),
                         "carrot1",
                         "turtle1")
        rate.sleep()

6. 其他

(1)使用参数

<launch>
  <!-- set /global_example parameter -->
  <param name="global_example" value="global value" />
  <group ns="foo">
    <!-- set /foo/utterance -->
    <param name="utterance" value="Hello World" />
    <param name="to_delete" value="Delete Me" />
    <!-- a group of parameters that we will fetch together -->
    <group ns="gains">
      <param name="P" value="1.0" />
      <param name="I" value="2.0" />
      <param name="D" value="3.0" />      
    </group>
    <node pkg="rospy_tutorials" name="param_talker" type="param_talker.py" output="screen">
      <!-- set /foo/utterance/param_talker/topic_name -->
      <param name="topic_name" value="chatter" />
    </node>
  </group>
</launch>
#!/usr/bin/env python

import rospy
from std_msgs.msg import String

def param_talker():
    rospy.init_node('param_talker')

    # 从参数服务器获取值。在本例中,我们从三个不同的名称空间获取参数:
    # 1) global (/global_example)
    # 2) parent (/foo/utterance)
    # 3) private (/foo/param_talker/topic_name)

    # fetch a /global parameter
    global_example = rospy.get_param("/global_example")
    rospy.loginfo("%s is %s", rospy.resolve_name('/global_example'), global_example)
    
    # fetch the utterance parameter from our parent namespace
    utterance = rospy.get_param('utterance')
    rospy.loginfo("%s is %s", rospy.resolve_name('utterance'), utterance)
    
    # fetch topic_name from the ~private namespace
    topic_name = rospy.get_param('~topic_name')
    rospy.loginfo("%s is %s", rospy.resolve_name('~topic_name'), topic_name)

    # fetch a parameter, using 'default_value' if it doesn't exist
    default_param = rospy.get_param('default_param', 'default_value')
    rospy.loginfo('%s is %s', rospy.resolve_name('default_param'), default_param)
    
    # fetch a group (dictionary) of parameters
    gains = rospy.get_param('gains')
    p, i, d = gains['P'], gains['I'], gains['D']
    rospy.loginfo("gains are %s, %s, %s", p, i, d)    

    # set some parameters
    rospy.loginfo('setting parameters...')
    rospy.set_param('list_of_floats', [1., 2., 3., 4.])
    rospy.set_param('bool_True', True)
    rospy.set_param('~private_bar', 1+2)
    rospy.set_param('to_delete', 'baz')
    rospy.loginfo('...parameters have been set')

    # delete a parameter
    if rospy.has_param('to_delete'):
        rospy.delete_param('to_delete')
        rospy.loginfo("deleted %s parameter"%rospy.resolve_name('to_delete'))
    else:
        rospy.loginfo('parameter %s was already deleted'%rospy.resolve_name('to_delete'))

    # search for a parameter
    param_name = rospy.search_param('global_example')
    rospy.loginfo('found global_example parameter under key: %s'%param_name)
    
    # publish the value of utterance repeatedly
    pub = rospy.Publisher(topic_name, String, queue_size=10)
    while not rospy.is_shutdown():
        pub.publish(utterance)
        rospy.loginfo(utterance)
        rospy.sleep(1)
        
if __name__ == '__main__':
    try:
        param_talker()
    except rospy.ROSInterruptException: pass

(2)使用日志

日志按严重程度,分为:DEBUG INFO WARN ERROR FATAL

  • DEBUG(调试) , 您永远不需要查看系统是否正常工作的信息
  • INFO(信息) ,少量的信息,可能是有用的用户
  • WARN(警告) , 用户可能会发现报警,并可能会影响应用程序的输出,但是该系统的预期工作的一部分
  • ERROR(错误), 一些严重的,已经错了的信息
  • FATAL(致命),事情发生了不可恢复的

日志API:

rospy.logdebug(msg, *args)
rospy.logwarn(msg, *args)
rospy.loginfo(msg, *args)
rospy.logerr(msg, *args)
rospy.logfatal(msg, *args)
#!/usr/bin/env python

import rospy
from  std_msgs.msg import String

def log_level():

    debug ="Received a message on topic X from caller Y"
    rospy.logdebug("it is debug: %s", debug)

    info = "Node initialized"
    rospy.loginfo("it is info: %s", info)

    warn = "Could not load configuration file from <path>. Using defaults"
    rospy.logwarn("it is warn: %s", warn)

    error = "Received unexpected NaN value in transform X. Skipping..."
    rospy.logerr("it is error:%s", error)

    fatal = "Motors have caught fire!"
    rospy.logfatal("it is fatal: %s", fatal)


if __name__ == '__main__':

    try:
        rospy.init_node('log_level', log_level=rospy.DEBUG)
        log_level()
    except rospy.ROSInterruptException :
        pass

(3)节点的初始化及关闭

  • 初始化节点
rospy.init_node('my_node_name')
rospy.init_node('my_node_name', anonymous=True)
rospy.init_node(name, anonymous=False, log_level=rospy.INFO, disable_signals=False) 

# anonymous=True,这样可以启动多个版本。
# log_level=rospy.INFO,日志级别。设置默认记录到rosout的信息。
# disable_signals=False,默认rospy注册信号处理器以便可以使用ctrl+c来退出。下面的情况下,你可以禁止:
	#不是从python的主线程调用init_node()。python只允许主线程注册信号处理器。
	#在wxPython或其他GUI工具运行rospy,它们有自己的退出处理。
	#你希望默认使用自己的信号处理器。
  • 节点关闭
rospy.on_shutdown(h)     #当处理关闭,会调用h函数,h函数不带参数。
rospy.signal_shutdown(reason)  # 在初始化节点,disable_signals为True时。手工处理,reason 为关闭理由,字符串内容

(4)名称和节点信息

  • rospy.get_name(),获取此节点的完全限定名称
  • rospy.get_namespace(),获取此节点的命名空间
  • rospy.get_node_uri(),获取这个节点的XMLRPC URI

(5)时间

在rospy由rospy.Time和rospy.Duration实现,前者特定时刻,后者为持续时间(可负)

  • int32 secs
  • int32 nsecs

https://www.ncnynl.com/archives/201611/1074.html

(6)异常

  • ROSException,ROS客户端基本异常类
  • ROSSerializationException,信息序列化的错误异常
  • ROSInitException,初始化ROS状态的错误异常
  • ROSInterruptException,操作中断的错误异常,经常在 rospy.sleep() and rospy.Rate 中用到
  • ROSInternalException,rospy内部错误的异常 (i.e. bugs).
  • ServiceException,ROS服务通讯相关错误的异常

参考:

  • https://www.ncnynl.com/archives/201611/1059.html
  • https://www.ncnynl.com/archives/201611/1060.html
  • https://www.ncnynl.com/archives/201611/1061.html
  • https://www.ncnynl.com/archives/201611/1062.html
  • https://www.ncnynl.com/archives/201611/1063.html
  • https://www.ncnynl.com/archives/201611/1064.html
  • https://www.ncnynl.com/archives/201611/1065.html
  • https://www.ncnynl.com/archives/201611/1066.html
  • https://www.ncnynl.com/archives/201611/1067.html
  • https://www.ncnynl.com/archives/201611/1068.html
  • https://www.ncnynl.com/archives/201611/1069.html
  • https://www.ncnynl.com/archives/201611/1070.html
  • https://www.ncnynl.com/archives/201611/1071.html
  • https://www.ncnynl.com/archives/201611/1072.html
  • https://www.ncnynl.com/archives/201611/1073.html
  • https://www.ncnynl.com/archives/201611/1074.html
  • https://www.ncnynl.com/archives/201611/1075.html
  • https://www.ncnynl.com/archives/201611/1076.html
  • https://www.ncnynl.com/archives/201611/1077.html
  • https://www.ncnynl.com/archives/201611/1078.html
  • https://www.ncnynl.com/archives/201611/1079.html
  • https://www.ncnynl.com/archives/201611/1080.html
  • https://www.ncnynl.com/archives/201611/1081.html
  • https://www.ncnynl.com/archives/201611/1082.html

  • https://www.ncnynl.com/archives/201611/1083.html