roscpp/rospy节点编写
-
-
- subscriber/advertiser编写
roscpp实例
rospy实例
server/client编写
roscpp实例
rospy实例
actionlib编写
roscpp实例
rospy实例
- subscriber/advertiser编写
-
ROS节点中主要以subscriber/advertiser、service/client、action_server/action_client形式存在,通过上述形式完成节点间的通信与组网。
subscriber/advertiser编写
subscriber与advertiser是消息通信机制中消息的订阅者与发布者,支持采用c++、python进行编写,以下将分别编写C++、python实例。
roscpp实例
- advertiser
在tutorial/src/tutorial_advertiser_node.cpp中添加如下代码:
#include <ros/ros.h>
#include <tutorial/RobotState.h>
int main(int argc,char **argv)
{
//ros::init():初始化节点
ros::init(argc,argv,"tutorial_advertiser_node");
//ros::NodeHandle:创建句柄
ros::NodeHandle nh;
//创建publisher
ros::Publisher pub = nh.advertise<tutorial::RobotState>("robot_state",1);
//设置频率,20Hz
ros::Rate r(20);
while(ros::ok())
{
//初始化消息
tutorial::RobotState msg;
//设置消息中字段的值
msg.mode_state = tutorial::RobotState::MANUAL_MODE;
msg.hardware_state = tutorial::RobotState::NORMAL;
//发布消息
pub.publish(msg);
ROS_INFO("I published a msg");
ros::spinOnce();
//节点进入休眠
r.sleep();
}
return 0;
}
- subscriber
在tutorial/src/tutorial_subscriber_node.cpp中添加如下代码:
#include <ros/ros.h>
#include <tutorial/RobotState.h>
/*
* @brief: 消息回调函数,处理收到的消息
* @param msg 消息类型为tutorial::RobotState::ConstPtr的消息
* @return void 无
*/
void callback(const tutorial::RobotState::ConstPtr &msg)
{
ROS_INFO("I received a message:mode_state is %d,hardware_state is %d.",msg->mode_state,msg->hardware_state);
}
int main(int argc,char **argv)
{
//初始化节点
ros::init(argc,argv,"tutorial_subscriber_node");
//创建句柄
ros::NodeHandle nh;
//创建subscriber
ros::Subscriber sub = nh.subscribe("robot_state",1,callback);
ros::spin();
return 0;
}
- 编译运行节点
在tutorial包的CMakeLists.txt添加如下代码:
add_executable(tutorial_sub_node src/tutorial_subscriber_node.cpp)
add_executable(tutorial_pub_node src/tutorial_advertiser_node.cpp)
add_dependencies(tutorial_sub_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(tutorial_pub_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
target_link_libraries(tutorial_sub_node ${catkin_LIBRARIES})
target_link_libraries(tutorial_pub_node ${catkin_LIBRARIES})
保存后编译即可。
$ catkin_make
分别打开三个终端,运行如下指令:
运行roscore
roscore
运行advertiser
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_pub_node
运行subscriber
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_sub_node
rospy实例
在tutorial包中新建文件夹命名为script,并打开script文件夹添加tutorial_sub.py和tutorial_pub.py,添加如下代码:
- advertiser:tutorial_pub.py
#!/usr/bin/env python
import rospy
from tutorial.msg import RobotState
def talker():
rospy.init_node("tutorial_pub_node",anonymous=True)
pub = rospy.Publisher("robot_state",RobotState,queue_size=1);
rate = rospy.Rate(20);
while not rospy.is_shutdown():
msg = RobotState()
msg.mode_state = 0
msg.hardware_state = 0
pub.publish(msg)
rospy.loginfo("I publish a msg")
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
- subscriber:tutorial_sub.py
#!/usr/bin/env python
import rospy
from tutorial.msg import RobotState
def callback(msg):
rospy.loginfo("I received a msg: mode_state: %d, hardware_state: %d.",msg.mode_state,msg.hardware_state)
def listener():
rospy.init_node("tutorial_sub_node",anonymous=True)
sub = rospy.Subscriber("robot_state",RobotState,callback)
rospy.spin()
if __name__ == '__main__':
listener()
- 运行节点
首先修改python文件的属性,添加可执行权限
$ cd ~/ros_ws/
$ roscd tutorial/script/
$ chmod +x tutorial_sub.py tutorial_pub.py
打开三个终端,分别运行roscore、subscriber、advertiser
运行roscore:
$ roscore
运行subscriber
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_sub.py
运行advertiser
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_pub.py
server/client编写
server/client以应答模式存在,通常用于传递不需要高频率传递的消息或执行某种动作。支持roscpp、rospy编写,以下是C++、python编写的实例。
roscpp实例
- server:tutorial_server_node.cpp
//server的回调函数
bool callback(tutorial::RobotStateService::Request &req,tutorial::RobotStateService::Response &res)
{
if(req.state == 255)
{
res.mode_state = mode_state;
res.hardware_state = hardware_state;
}else
{
mode_state = req.state;
res.mode_state = mode_state;
res.hardware_state = hardware_state;
}
ROS_INFO("I had got a request.");
return true;
}
int main(int argc,char **argv)
{
ros::init(argc,argv,"tutorial_server_node");
ros::NodeHandle nh;
//定义server
ros::ServiceServer server = nh.advertiseService("robot_state_srv",callback);
ROS_INFO("server is Ready");
ros::spin();
return 0;
}
- client : tutorial_client_node.cpp
#include <ros/ros.h>
#include <tutorial/RobotStateService.h>
int main(int argc,char **argv)
{
ros::init(argc,argv,"tutorial_client_node");
ros::NodeHandle nh;
//定义client
ros::ServiceClient client = nh.serviceClient<tutorial::RobotStateService>("robot_state_srv");
//定义srv变量
tutorial::RobotStateService req;
req.request.state = 255;
//请求服务
if(client.call(req))
{
ROS_INFO("call server sucessed. mode_state: %d, hardware_state: %d",req.response.mode_state,req.response.hardware_state);
}else
{
ROS_ERROR("Failed to call service.");
return 1;
}
return 0;
}
- 编译运行
在CMakeLists.txt文件中添加如下代码:
add_executable(tutorial_server_node src/tutorial_server_node.cpp)
add_executable(tutorial_client_node src/tutorial_client_node.cpp)
add_dependencies(tutorial_server_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(tutorial_client_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
target_link_libraries(tutorial_server_node ${catkin_LIBRARIES})
target_link_libraries(tutorial_client_node ${catkin_LIBRARIES})
运行编译指令编译功能包:
$ cd ~/ros_ws/
$ catkin_make
编译完成后打开三个终端分别运行roscore、server、client(运行指令可参考subscriber与advertiser部分),运行结果如图所示:
rospy实例
- server : tutorial_server.py
#! /usr/bin/env python
import sys
import rospy
#载入服务类型
from tutorial.srv import RobotStateService,RobotStateServiceRequest,RobotStateServiceResponse
mode_state = 0
hardware_state = 0
#服务回调函数
def callback(req):
global mode_state
res = RobotStateServiceResponse()
if req.state == 255:
res.mode_state = mode_state
res.hardware_state = hardware_state
else:
mode_state = req.state
res.mode_state = mode_state
res.hardware_state = hardware_state
rospy.loginfo("I had got a request")
return res
def server():
#初始化节点
rospy.init_node("tutorial_server",anonymous=True)
#定义server,指定回调函数
server = rospy.Service("robot_state_srv",RobotStateService,callback)
rospy.loginfo("Server is ready.")
rospy.spin()
if __name__ == '__main__':
server()
- client : tutorial_client.py
#!/usr/bin/env python
import rospy
#载入服务类型
from tutorial.srv import RobotStateService,RobotStateServiceRequest,RobotStateServiceResponse
def client():
#初始化节点
rospy.init_node("tutorial_client",anonymous=True)
#等待服务
rospy.wait_for_service("robot_state_srv")
try:
#定义客户端
client = rospy.ServiceProxy("robot_state_srv",RobotStateService)
#请求客户端
resp = client(255)
rospy.loginfo("service call sucessed.")
except rospy.ServiceException as e:
rospy.loginfo("service call failed: %s",e)
if __name__ == '__main__':
client()
运行节点
运行方法参照advertiser/subscriber的rospy实例运行即可,运行结果如图所示:
actionlib编写
actionlib是ROS中一个很重要的功能包集合,尽管在ROS中已经提供了srevice机制来满足请求—响应式的使用场景,但是假如某个请求执行时间很长,在此期间用户想查看执行的进度或者取消这个请求的话,service机制就不能满足了,但是actionlib可满足用户这种需求。如,控制机器人运动到地图中某一目标位置时,这个过程可能复杂而漫长,执行过程中还可能强制中断或反馈信息,这时就需要用到actionlib。
actionlib使用client-server工作模式,ActionClient 和ActionServer通过"ROS Action Protocol"进行通信,"ROS Action Protocol"以ROS消息方式进行传输。此外ActionClient 和ActionServer给用户提供了一些简单的接口,用户使用这些接口可以完成goal请求(client-side)和goal执行(server-side)。
ActionClient 和ActionServer之间使用action protocol通信,action protocol就是预定义的一组ROS message,这些message被放到ROS topic上在 ActionClient 和ActionServer之间进行传实现二者的沟通。
roscpp实例
*actionlib server
#include <ros/ros.h>
#include <tutorial/AutoDockingAction.h>
#include <actionlib/server/simple_action_server.h>
typedef actionlib::SimpleActionServer<tutorial::AutoDockingAction> Server;
void executeCB(const tutorial::AutoDockingGoal::ConstPtr &goal,Server *as)
{
int i = 10;
tutorial::AutoDockingResult result;
tutorial::AutoDockingFeedback feedback;
ros::Rate r(0.5);
//执行动作,可以在此处编写需要的动作
while(ros::ok())
{
if(i <= 0)
{
feedback.command.linear.x = i;
feedback.command.angular.z = 10 - i;
as->publishFeedback(feedback);
break;
}
i --;
ROS_INFO("%d action server is executing.",i);
r.sleep();
}
result.is_docked = true;
as->setSucceeded(result);
ROS_INFO("action server execute complete");
}
int main(int argc ,char** argv)
{
ros::init(argc,argv,"action_server_node");
ros::NodeHandle nh;
Server server(nh,"auto_dock",boost::bind(&executeCB,_1,&server),false);
server.start();
ros::spin();
return 0;
}
- actionlib client
#include <ros/ros.h>
#include <actionlib/client/simple_action_client.h>
#include <tutorial/AutoDockingAction.h>
typedef actionlib::SimpleActionClient<tutorial::AutoDockingAction> Client;
int main(int argc,char** argv)
{
ros::init(argc,argv,"action_client_node");
ros::NodeHandle nh;
//创建客户端
Client client("auto_dock",true);
//连接客户端
client.waitForServer();
tutorial::AutoDockingGoal goal;
goal.dock_pose.header.stamp = ros::Time::now();
goal.dock_pose.header.frame_id = "map";
goal.dock_pose.pose.position.x = 1.62830536433;
goal.dock_pose.pose.position.y = 10.4706644315;
goal.dock_pose.pose.position.z = 0.0;
goal.dock_pose.pose.orientation.x = 0.0;
goal.dock_pose.pose.orientation.y = 0.0;
goal.dock_pose.pose.orientation.z = 0.808817724421;
goal.dock_pose.pose.orientation.w = 0.588059426132;
goal.use_move_base = true;
//发送goal
client.sendGoal(goal);
while (ros::ok())
{
//循环查询状态
bool flag = client.waitForResult(ros::Duration(2.0));
if (client.getState() == actionlib::SimpleClientGoalState::SUCCEEDED)
{
ROS_INFO("succeeded!");
break;
}
else if(client.getState() == actionlib::SimpleClientGoalState::ACTIVE)
{
ROS_INFO("state = %s.", client.getState().toString().c_str());
continue;
}else
{
ROS_INFO("state = %s.", client.getState().toString().c_str());
return 0;
}
ROS_INFO("flag,%d",flag);
}
return 0;
}
- 编译运行
在CMakeList.txt中添加如下代码:
add_executable(action_server_node src/action_server_node.cpp)
add_executable(action_client_node src/action_client_node.cpp)
add_dependencies(action_server_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(action_client_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
target_link_libraries(action_server_node ${catkin_LIBRARIES})
target_link_libraries(action_client_node ${catkin_LIBRARIES})
运行catkin_make编译节点后,运行节点(参照advertiser/subscriber节点运行方式),运行效果如图所示:
rospy实例
- actionlib server
#!/usr/bin/env python
import rospy
import actionlib
import tutorial.msg
class ActionServer(object):
_feedback = tutorial.msg.AutoDockingFeedback()
_result = tutorial.msg.AutoDockingResult()
def __init__(self,name):
self._action_name = name
#定义server
self._as = actionlib.SimpleActionServer(self._action_name,tutorial.msg.AutoDockingAction,execute_cb=self.execute_cb,auto_start = False)
self._as.start()
#定义回调函数
def execute_cb(self,goal):
rate = rospy.Rate(1)
success = True
i = 10
#主循环体,执行动作
while not rospy.is_shutdown():
#收到cancel指令或其他指令
if self._as.is_preempt_requested():
rospy.loginfo("%s: Preempted.",self._action_name)
self._as.set_preempted()
success = False
break
self._feedback.command.linear.x = i
self._feedback.command.angular.z = 10 - i
self._as.publish_feedback(self._feedback)
rospy.loginfo("%d action server is executing.",i)
i = i - 1
if i < 0:
break
rospy.loginfo("action server exit.")
#执行完成
if success:
self._result.is_docked = success
rospy.loginfo("%s: Succeeded.",self._action_name)
self._as.set_succeeded(self._result)
if __name__ == '__main__':
rospy.init_node('action_server')
server = ActionServer("auto_dock")
rospy.spin()
- actionlib client
#!/usr/bin/env python
import rospy
import actionlib
import tutorial.msg
def action_client():
#定义client
client = actionlib.SimpleActionClient("auto_dock",tutorial.msg.AutoDockingAction)
#等待连接
client.wait_for_server()
#设置并发送goal
goal = tutorial.msg.AutoDockingGoal()
goal.dock_pose.header.stamp = rospy.Time.now()
goal.dock_pose.header.frame_id = "map"
goal.dock_pose.pose.position.x = 1.62830536433
goal.dock_pose.pose.position.y = 10.4706644315
goal.dock_pose.pose.position.z = 0.0
goal.dock_pose.pose.orientation.x = 0.0
goal.dock_pose.pose.orientation.y = 0.0
goal.dock_pose.pose.orientation.z = 0.808817724421
goal.dock_pose.pose.orientation.w = 0.588059426132
goal.use_move_base = True
client.send_goal(goal)
#等待结果
client.wait_for_result()
return client.get_result()
if __name__ == '__main__':
try:
rospy.init_node("action_client")
result = action_client()
rospy.loginfo('rusult:'+str(result.is_docked))
except rospy.ROSInterruptException:
pass
运行节点
运行节点,参照advertiser/subscriber的rospy节点,运行结果如图所示:
评论(0)
您还未登录,请登录后发表或查看评论