横向控制 | 路径信息及可视化

Github链接: https://github.com/chanchanchan97/ROS

1. 全局路径的读取

waypoint_loader.py文件的功能是读取.csv文件中的路径点信息,并发布消息waypoints和base_path,其中waypoints包含了路径点的位置信息、小车的期望速度和表明小车是前进还是后退的状态位,base_path包含了实现Rviz可视化的路径点坐标。

(1) 主函数

def new_waypoint_loader(self, path):
	if os.path.isfile(path):
		waypoints, base_path = self.load_waypoints(path)
		self.publish(waypoints, base_path)
		rospy.loginfo('Waypoint Loded')
	else:
		rospy.logerr('%s is not a file', path)

说明:

  • os.path.isfile(path)用于判断某一对象是否为文件,其中path必须为绝对路径。
  • self.load_waypoints(path)用于加载.csv文件,并以包含路径信息的消息作为返回值。
  • self.publish(waypoints, base_path)用于发布话题消息。
  • rospy.loginfo(‘Waypoint Loded’)用于输出INFO日志信息。
  • rospy.logerr(‘%s is not a file’, path)当判断该对象不为文件时,输出ERROR日志信息。

(2) load_waypoints()路径加载函数

def load_waypoints(self, fname):
	waypoints = []
	base_path = Path()
	base_path.header.frame_id = 'world'
	with open(fname) as wfile:
		reader = csv.DictReader(wfile, CSV_HEADER)
		for wp in reader:
			p = Waypoint()
			p.pose.pose.position.x = float(wp['x'])
			p.pose.pose.position.y = float(wp['y'])
			q = self.quaternion_from_yaw(float(wp['yaw']))
			p.pose.pose.orientation = Quaternion(*q)
			p.twist.twist.linear.x = float(self.velocity)
			p.forward = True
			waypoints.append(p)

			path_element = PoseStamped()
			path_element.pose.position.x = p.pose.pose.position.x
			path_element.pose.position.y = p.pose.pose.position.y
			base_path.poses.append(path_element)
			
	waypoints = self.decelerate(waypoints)
	return waypoints,base_path

说明:

  • waypoints = []创建了一个空列表,命名为waypoint。
  • base_path = Path()和base_path.header.frame_id = 'world’定义了一个Path类的对象base_path,并以world作为frame id。
  • with open(fname) as wfile的作用是打开文件(保证在文件出错后能够正常关闭文件),其中fname在此处是文件的绝对路径。
  • reader = csv.DictReader(wfile, CSV_HEADER)以CSV_HEADER的构造格式,即‘x’、‘y’和‘yaw’格式读取文件内容。
  • for wp in reader:在此处遍历文件中的每一行。
    在这里插入图片描述

  • p = Waypoint()定义了一个Waypoint类的对象,其中Waypoint类在styx_msgs文件夹的Waypoint.msg中进行了自定义,包含了geometry_msgs/PoseStamped pose、geometry_msgs/TwistStamped twist和bool forward,分别表示小车在环境中的x、y坐标、小车的期望速度以及表明小车是前进还是后退的标志位。
    在这里插入图片描述
  • p.pose.pose.position.x = float(wp[‘x’])和p.pose.pose.position.y = float(wp[‘y’])用于将文件中的x、y所对应的数据填充到相应的消息内容中。
  • q = self.quaternion_from_yaw(float(wp[‘yaw’]))即调用tf.transformations.quaternion_from_euler(0., 0., yaw)该函数,用于将 (0., 0., yaw)欧拉角转换成四元数quaternion。
  • p.pose.pose.orientation = Quaternion(*q)该操作相当于p.pose.pose.orientation = Quaternion(q[0], q[1], q[2], q[3]),用于将元祖扩展成参数列表,并将四元数数据填充到相应的消息内容中。
  • p.twist.twist.linear.x = float(self.velocity)用于将期望的线速度值填充到相应的消息内容中。
  • waypoints.append§用于将Waypoint类的对象p,即路径信息添加到列表waypoints中。
  • path_element = PoseStamped()定义了一个PoseStamped类的对象,用于在Rviz中可视化路径。
  • waypoints = self.decelerate(waypoints)根据小车当前位置与终点位置来控制小车速度,并保证在终点位置速度为零。

(3) decelerate()小车减速函数

def decelerate(self, waypoints):
	last = waypoints[-1]
	last.twist.twist.linear.x = 0.
	for wp in waypoints[:-1][::-1]:
		dist = self.distance(wp.pose.pose.position, last.pose.pose.position)
		vel = math.sqrt(2 * MAX_DECEL * dist)
		if vel < 1.:
			vel = 0.
		wp.twist.twist.linear.x = min(vel, wp.twist.twist.linear.x)
	return waypoints

说明:

  • last = waypoints[-1]用于将waypoints列表中的最后一个值,即路径的终点信息赋给变量last。
  • for wp in waypoints[:-1][::-1]:用于遍历waypoints列表中的所有值。
  • dist = self.distance(wp.pose.pose.position, last.pose.pose.position)用于计算当前坐标点与终点坐标的距离。
  • vel = math.sqrt(2 * MAX_DECEL * dist)根据小车当前位置距离终点的路程dist、人为设定的减速度MAX_DECEL和到达终点时的速度(速度为零)来计算得到小车当前的期望速度vel,其中所运用到的是高中物理公式Vt2 - V02 = 2as。
  • wp.twist.twist.linear.x = min(vel, wp.twist.twist.linear.x)表示在距离终点较远的位置,按照wp.twist.twist.linear.x的默认期望速度行驶,而在距离终点较近的位置,按照自定义的减速度开始减速行驶。

(4) distance()距离计算函数

def distance(self, p1, p2):
	x, y, z = p1.x - p2.x, p1.y - p2.y, p1.z - p2.z
	return math.sqrt(x*x + y*y + z*z)

说明:

  • 该函数用于计算两个坐标点之间的距离。

(5) quaternion_from_yaw()四元数转换函数

def quaternion_from_yaw(self, yaw):
	return tf.transformations.quaternion_from_euler(0., 0., yaw)

说明:

  • 该函数用于将欧拉角转换成四元数形式。

(6) kmph2mps()速度单位换算函数

def kmph2mps(self, velocity_kmph):
	return (velocity_kmph * 1000.) / (60. * 60.)

说明:

  • 该函数用于将单位为km/h的速度值转换成以m/s为单位。

(7) publish()消息发布函数

def publish(self, waypoints, base_path):
	lane = Lane()
	lane.header.frame_id = '/world'
	lane.header.stamp = rospy.Time(0)
	lane.waypoints = waypoints
	self.pub.publish(lane)
	self.pub_path.publish(base_path)

说明:

  • 该函数用于发布话题消息waypoints和base_path。

(8) 初始化函数

def __init__(self):
	rospy.init_node('waypoint_loader', log_level=rospy.DEBUG)
	
	self.pub = rospy.Publisher('/base_waypoints', Lane, queue_size=1, latch=True)
	self.pub_path = rospy.Publisher('/base_path', Path, queue_size=1, latch=True)

	self.velocity = self.kmph2mps(rospy.get_param('~velocity'))
	self.new_waypoint_loader(rospy.get_param('~path'))
	rospy.spin()

说明:

  • rospy.init_node(‘waypoint_loader’, log_level=rospy.DEBUG)用于初始化ROS节点,在节点完全初始化之前,消息不会出现在/rosout话题上,因此可能看不到初始消息。要想在/rosout上看到logdebug消息,可以将log_level参数传递给rospy.init_node()。
  • self.pub = rospy.Publisher(‘/base_waypoints’, Lane, queue_size=1, latch=True)和self.pub_path = rospy.Publisher(‘/base_path’, Path, queue_size=1, latch=True)分别用于创建以/base_waypoints和/base_path为话题消息的发布者。其中latch=True将保存最后发布的消息并将其发送给连接的任何将来的订户,这对于诸如地图等缓慢变化的数据或静态数据很有用。
  • self.velocity = self.kmph2mps(rospy.get_param(‘~velocity’))用于加载参数velocity的值,并转换成以m/s为单位。
  • self.new_waypoint_loader(rospy.get_param(‘~path’))用于加载参数path所对应的文件路径,并发布相关的话题消息。

2. 局部路径和历史路径的更新

waypoint_updater.py文件的功能是通过KD-tree查询距离小车最近的路径点,以该点为起始点设置局部路径,并在Rviz可视化工具中显示局部路径。
(1) 初始化函数

def __init__(self):
	rospy.init_node('waypoint_updater')

	rospy.Subscriber('/smart/rear_pose', PoseStamped, self.pose_cb)
	rospy.Subscriber('/base_waypoints', Lane, self.waypoints_cb)

	self.final_waypoints_pub = rospy.Publisher('final_waypoints', Lane, queue_size=1)
	self.final_path_pub = rospy.Publisher('final_path', Path, queue_size=1)
	self.final_waypoints_pub = rospy.Publisher('real_waypoints', Lane, queue_size=1)
	self.final_path_pub = rospy.Publisher('real_path', Path, queue_size=1)

	# TODO: Add other member variables you need below
	self.real_waypoints = []
	self.base_waypoints = None
	self.waypoints_2d = None
	self.waypoint_tree = None
	self.pose = None

	# rospy.spin()
	rate = rospy.Rate(20)
	while not rospy.is_shutdown():
		if self.pose and self.base_waypoints and self.waypoint_tree:
			# Get closest waypoint
			closest_waypoint_idx = self.get_closest_waypoint_idx()
			self.publish_waypoints(closest_waypoint_idx)
		rate.sleep()

说明:

  • rospy.init_node(‘waypoint_updater’)用于初始化ROS节点。
  • rospy.Subscriber(‘/smart/rear_pose’, PoseStamped, self.pose_cb)和rospy.Subscriber(‘/base_waypoints’, Lane, self.waypoints_cb)分别用于创建以/smart/rear_pose和/base_waypoints为话题消息的订阅者,其中/smart/rear_pose话题消息内容为小车后轮中心点的坐标,/base_waypoints 话题消息内容为全局路径点坐标及小车期望速度等,pose_cb和waypoints_cb分别为/smart/rear_pose和/base_waypoints的消息回调函数。
  • self.final_waypoints_pub = rospy.Publisher(‘final_waypoints’, Lane, queue_size=1)和self.final_path_pub = rospy.Publisher(‘final_path’, Path, queue_size=1)分别创建以final_waypoints和final_path为话题消息的发布者,用于发布局部路径信息。
  • self.final_waypoints_pub = rospy.Publisher(‘real_waypoints’, Lane, queue_size=1)和self.final_path_pub = rospy.Publisher(‘real_path’, Path, queue_size=1)分别创建以real_waypoints和real_path为话题消息的发布者,用于发布历史路径信息。
  • rate = rospy.Rate(20)用于设置ROS节点的更新频率为20Hz。
  • while not rospy.is_shutdown():表示在ROS节点未关闭的条件下,执行下面的操作。
  • if self.pose and self.base_waypoints and self.waypoint_tree:表示在pose、base_waypoints和waypoint_tree话题消息内容非空的条件下,执行下面的操作。
  • closest_waypoint_idx = self.get_closest_waypoint_idx()的作用是返回一个距离当前小车位置最近的路径点的横坐标x。
  • self.publish_waypoints(closest_waypoint_idx)用于发布话题消息closest_waypoint_idx。

(2) 小车位置回调函数

def pose_cb(self, msg):
	self.pose = msg

说明:

  • 该函数为/smart/rear_pose话题消息的回调函数,作用是将该话题消息内容赋给self.pose。

(3) 路径点回调函数

def waypoints_cb(self, waypoints):
	self.base_waypoints = waypoints
	if not self.waypoints_2d:
		self.waypoints_2d = [[waypoint.pose.pose.position.x, waypoint.pose.pose.position.y] for waypoint in waypoints.waypoints]
		self.waypoint_tree = KDTree(self.waypoints_2d)

说明:

  • 该函数为/base_waypoints话题消息的回调函数。
  • self.waypoints_2d = [[waypoint.pose.pose.position.x, waypoint.pose.pose.position.y] for waypoint in waypoints.waypoints]将waypoints话题消息中的x、y轴坐标提取到一个二维列表self.waypoints_2d中。二维列表self.waypoints_2d的内容如下:

    在这里插入图片描述

  • self.waypoint_tree = KDTree(self.waypoints_2d)的作用是构建一个二维的KD-tree。

(4) get_closest_waypoint_idx()最近路径点获取函数

def get_closest_waypoint_idx(self):
	x = self.pose.pose.position.x
	y = self.pose.pose.position.y

	q = Waypoint()
	q.pose.pose.position.x = x
	q.pose.pose.position.y = y
	self.real_waypoints.append(q)

	closest_idx = self.waypoint_tree.query([x,y],1)[1]

	return closest_idx

说明:

  • q = Waypoint()的作用是创建一个Waypoint类型的对象q,包含坐标点信息。
  • self.real_waypoints.append(q)的作用是将最新的实时位置信息添加到列表中。
  • closest_idx = self.waypoint_tree.query([x,y],1)[1]的作用是在KD-tree中查询距离当前小车位置最近的一个路径点的横坐标x。

(5) publish_waypoints()消息发布函数

def publish_waypoints(self, closest_idx):
	# fill in final waypoints to publish
	lane = Lane()
	lane.header = self.base_waypoints.header
	lane.waypoints = self.base_waypoints.waypoints[closest_idx:closest_idx + LOOKAHEAD_WPS]

	real_lane = Lane()
	real_lane.header = self.base_waypoints.header
	real_lane.waypoints = self.real_waypoints

	# fill in path for visulization in Rviz
	path = Path()
	path.header.frame_id = '/world'
	for p in lane.waypoints:
		path_element = PoseStamped()
		path_element.pose.position.x = p.pose.pose.position.x
		path_element.pose.position.y = p.pose.pose.position.y
		path.poses.append(path_element)

	real_path = Path()
	real_path.header.frame_id = '/world'
	for m in real_lane.waypoints:
	real_path_element = PoseStamped()
	real_path_element.pose.position.x = m.pose.pose.position.x
	real_path_element.pose.position.y = m.pose.pose.position.y
	real_path.poses.append(real_path_element)

	self.final_waypoints_pub.publish(lane)
	self.final_path_pub.publish(path)
	self.final_waypoints_pub.publish(real_lane)
	self.final_path_pub.publish(real_path)

说明:

  • lane = Lane()的作用是创建一个Lane类型的对象lane。
  • lane.waypoints = self.base_waypoints.waypoints[closest_idx:closest_idx + LOOKAHEAD_WPS]的作用是将距离小车最近的路径点以及之后的LOOKAHEAD_WPS个路径点信息填充到final_waypoints话题消息中,其中LOOKAHEAD_WPS是自定义的局部路径点个数。
  • path.header.frame_id = '/world’设置/world为局部路径可视化的参考系坐标。
  • for p in lane.waypoints:的作用是遍历上述的局部路径点。
  • path_element.pose.position.x = p.pose.pose.position.x和path_element.pose.position.y = p.pose.pose.position.y是将局部路径点的坐标信息填充到final_path话题信息中,用于在Rviz可视化工具中显示局部路径。
  • self.final_waypoints_pub.publish(lane)和self.final_path_pub.publish(path)的作用是发布话题消息final_waypoints和final_path。
  • 历史路径消息real_path和real_lane的创建和定义同理。

3. 路径可视化

打开一个新的终端,输入如下指令:

$ rosrun rviz rviz

在左菜单栏中加入两个Path的可视化插件,分别将话题信息Topic设置为/base_path和/final_path,即全局路径和局部路径。显示效果如下图。
在这里插入图片描述
在这里插入图片描述

在Gazebo中移动小车模型,由于小车当前位置发生变化,局部路径也因此发生变化,如下图中Rviz所显示。