观前提醒:本博客介绍如何使用Python订阅ROS话题,并将接收到的消息保存到SQL数据库中,包括MySQL和SQL Server两种情况。
使用Python订阅ROS话题并将消息保存至MySQL数据库
下面我们将详细介绍如何使用Python订阅ROS话题,并将接收的数据保存到MySQL数据库中。这种技术可以用于机器人数据的记录、分析和回放。
第一步:安装Python依赖库
我们将使用rospy
来订阅ROS话题,以及mysql-connector-python
来连接MySQL数据库。在你的Python环境中,使用以下命令来安装这些库:
pip install mysql-connector-python
注意:rospy
通常作为ROS的一部分而已经安装,因此不需要单独安装。
第二步:配置MySQL数据库
在MySQL中,你需要创建一个新的数据库和表来存储从ROS话题收到的消息。
- 登录到MySQL服务器。
- 创建一个新数据库:
CREATE DATABASE ros_messages;
- 选择数据库:
USE ros_messages;
- 创建一个表来存储消息:
CREATE TABLE sensor_messages (
id INT AUTO_INCREMENT PRIMARY KEY,
topic VARCHAR(100),
message TEXT,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
这将创建一个名为sensor_messages
的表,其中包括id
,topic
,message
和received_at
字段。
第三步:编写Python脚本
现在,我们来编写Python脚本来订阅ROS话题,并将收到的消息保存到MySQL数据库中。
导入必要的模块
import rospy
from std_msgs.msg import String
import mysql.connector
from mysql.connector import Error
这里我们导入了rospy
模块来订阅ROS话题,std_msgs.msg
中的String
用于指定消息类型,以及mysql.connector
来管理MySQL数据库连接。
建立数据库连接
def create_db_connection():
try:
connection = mysql.connector.connect(
host='localhost',
database='ros_messages',
user='your_username',
password='your_password'
)
return connection
except Error as e:
print(e)
return None
create_db_connection
函数尝试连接到MySQL数据库,并在失败时打印错误。
定义消息处理函数
def handle_sensor_message(data):
db_connection = create_db_connection()
if db_connection is not None:
cursor = db_connection.cursor()
sql_insert_query = """
INSERT INTO sensor_messages (topic, message)
VALUES (%s, %s)
"""
cursor.execute(sql_insert_query, ('/sensor_topic', data.data))
db_connection.commit()
cursor.close()
db_connection.close()
handle_sensor_message
是一个回调函数,用来处理接收到的消息。它将连接到数据库,插入新的消息记录,然后关闭数据库连接。
初始化ROS节点并订阅话题
def main():
rospy.init_node('sensor_message_saver', anonymous=True)
rospy.Subscriber('/sensor_topic', String, handle_sensor_message)
rospy.spin()
main
函数初始化ROS节点,订阅名为/sensor_topic
的话题,并注册了handle_sensor_message
作为回调函数。rospy.spin()
使得脚本保持运行并等待消息的到来。
第四步:运行一下吧
将上述代码保存为sensor_message_saver.py
。确保ROScore和MySQL服务正在运行后,执行以下命令来启动脚本:
python sensor_message_saver.py
现在,每当有新消息发布到/sensor_topic
话题时,你的脚本就会将这些消息保存到MySQL数据库中。
话题发送
我们使用一个py发送话题
import rospy
from some_package.msg import SensorData # 假设消息类型定义在 some_package 包中
# 初始化 ROS 节点
rospy.init_node('sensor_publisher')
# 定义发布者,发布到 /sensor_topic 话题,消息类型为 SensorData
pub = rospy.Publisher('/sensor_topic', SensorData, queue_size=10)
# 创建消息实例
sensor_msg = SensorData()
sensor_msg.sensor_id = 123
sensor_msg.temperature = 25.2
sensor_msg.humidity = 58
# 发布消息
pub.publish(sensor_msg)
# 等待直到节点关闭
rospy.spin()
查看一下数据库内话题信息有没有正确存放
注意事项
- 如果数据库和ROS不运行在一个主机上,mySQL需要先进行配置局域网访问
使用Python订阅ROS话题并将消息保存至SQL Server数据库
下面我们将一步步介绍如何使用Python订阅话题,并将接收到的消息保存到SQL Server数据库中。
第一步:安装必要的库
首先,确保已经安装了用于ROS的Python库和用于SQL Server的数据库适配器pyodbc
。
在ROS中,通常我们会使用rospy
库来进行话题的订阅和发布。对于SQL Server,我们将使用pyodbc
库来与数据库进行交互。
pip install pyodbc
第二步:创建SQL Server数据库和表
在SQL Server Management Studio (SSMS) 或者通过sqlcmd工具,执行以下SQL脚本来创建一个数据库和表以存储来自ROS话题的消息。
CREATE DATABASE RosData;
USE RosData;
CREATE TABLE SensorMessages (
Id INT PRIMARY KEY IDENTITY(1,1),
TopicName NVARCHAR(255) NOT NULL,
MessageData NVARCHAR(MAX) NOT NULL,
ReceivedDateTime DATETIME NOT NULL DEFAULT GETDATE()
);
此脚本创建了一个名为RosData
的数据库和一个名为SensorMessages
的表,表中包括自增的主键Id
,话题名称TopicName
,消息数据MessageData
和接收时间戳ReceivedDateTime
。
第三步:编写Python脚本来订阅ROS话题
现在我们将创建一个Python脚本来订阅ROS话题,并将接收到的消息保存到SQL Server数据库中。
导入必要的模块
import rospy
from std_msgs.msg import String # 假设我们订阅的话题类型为std_msgs/String
import pyodbc
设置SQL Server数据库连接
def create_database_connection():
try:
connection = pyodbc.connect(
'DRIVER={ODBC Driver 17 for SQL Server};'
'SERVER=your_server;'
'DATABASE=RosData;'
'UID=your_username;'
'PWD=your_password;'
)
return connection
except pyodbc.Error as e:
print(f"Error connecting to SQL Server: {e}")
return None
这段代码定义了一个函数create_database_connection
,用于建立与SQL Server数据库的连接。
插入数据到数据库
def insert_message(connection, topic_name, message_data):
cursor = connection.cursor()
query = "INSERT INTO SensorMessages (TopicName, MessageData) VALUES (?, ?)"
cursor.execute(query, topic_name, message_data)
connection.commit()
cursor.close()
insert_message
函数将消息数据插入到SensorMessages
表中。
定义ROS话题订阅回调函数
def ros_callback(message):
topic_name = '/ros_topic_name'
connection = create_database_connection()
if connection is not None:
insert_message(connection, topic_name, message.data)
connection.close()
ros_callback
函数会在接收到ROS话题消息时被调用,然后将消息保存到数据库。
设置ROS节点和订阅话题
def main():
rospy.init_node('ros_sql_server_saver', anonymous=True)
rospy.Subscriber('/ros_topic_name', String, ros_callback)
rospy.spin()
在main
函数中,我们初始化了一个ROS节点,并订阅了名为/ros_topic_name
的话题。每当收到消息时,ros_callback
函数就会被调用。
第四步:运行Python脚本
保存上述脚本为 ros_sql_saver.py
,确保SQL Server服务正在运行,并使用以下命令来运行脚本:
python ros_sql_saver.py
脚本将启动并订阅ROS话题。一旦接收到消息,它将调用ros_callback
函数,并将接收到的数据存储到SQL Server数据库中。
运行结果
我们直接使用命令发送话题
读取一下数据库内容:
评论(0)
您还未登录,请登录后发表或查看评论