观前提醒:本博客介绍如何使用Python订阅ROS话题,并将接收到的消息保存到SQL数据库中,包括MySQL和SQL Server两种情况。

使用Python订阅ROS话题并将消息保存至MySQL数据库

下面我们将详细介绍如何使用Python订阅ROS话题,并将接收的数据保存到MySQL数据库中。这种技术可以用于机器人数据的记录、分析和回放。

第一步:安装Python依赖库

我们将使用rospy来订阅ROS话题,以及mysql-connector-python来连接MySQL数据库。在你的Python环境中,使用以下命令来安装这些库:

pip install mysql-connector-python

注意:rospy通常作为ROS的一部分而已经安装,因此不需要单独安装。

第二步:配置MySQL数据库

在MySQL中,你需要创建一个新的数据库和表来存储从ROS话题收到的消息。

  1. 登录到MySQL服务器。
  2. 创建一个新数据库:CREATE DATABASE ros_messages;
  3. 选择数据库:USE ros_messages;
  4. 创建一个表来存储消息:
CREATE TABLE sensor_messages (
    id INT AUTO_INCREMENT PRIMARY KEY,
    topic VARCHAR(100),
    message TEXT,
    received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

这将创建一个名为sensor_messages的表,其中包括idtopicmessagereceived_at字段。

第三步:编写Python脚本

现在,我们来编写Python脚本来订阅ROS话题,并将收到的消息保存到MySQL数据库中。

导入必要的模块

import rospy
from std_msgs.msg import String
import mysql.connector
from mysql.connector import Error

这里我们导入了rospy模块来订阅ROS话题,std_msgs.msg中的String用于指定消息类型,以及mysql.connector来管理MySQL数据库连接。

建立数据库连接

def create_db_connection():
    try:
        connection = mysql.connector.connect(
            host='localhost',
            database='ros_messages',
            user='your_username',
            password='your_password'
        )
        return connection
    except Error as e:
        print(e)
        return None

create_db_connection函数尝试连接到MySQL数据库,并在失败时打印错误。

定义消息处理函数

def handle_sensor_message(data):
    db_connection = create_db_connection()
    if db_connection is not None:
        cursor = db_connection.cursor()
        sql_insert_query = """
            INSERT INTO sensor_messages (topic, message)
            VALUES (%s, %s)
        """
        cursor.execute(sql_insert_query, ('/sensor_topic', data.data))
        db_connection.commit()
        cursor.close()
        db_connection.close()

handle_sensor_message是一个回调函数,用来处理接收到的消息。它将连接到数据库,插入新的消息记录,然后关闭数据库连接。

初始化ROS节点并订阅话题

def main():
    rospy.init_node('sensor_message_saver', anonymous=True)
    rospy.Subscriber('/sensor_topic', String, handle_sensor_message)
    rospy.spin()

main函数初始化ROS节点,订阅名为/sensor_topic的话题,并注册了handle_sensor_message作为回调函数。rospy.spin()使得脚本保持运行并等待消息的到来。

第四步:运行一下吧

将上述代码保存为sensor_message_saver.py。确保ROScore和MySQL服务正在运行后,执行以下命令来启动脚本:

python sensor_message_saver.py

现在,每当有新消息发布到/sensor_topic话题时,你的脚本就会将这些消息保存到MySQL数据库中。

话题发送

我们使用一个py发送话题

import rospy
from some_package.msg import SensorData  # 假设消息类型定义在 some_package 包中

# 初始化 ROS 节点
rospy.init_node('sensor_publisher')

# 定义发布者,发布到 /sensor_topic 话题,消息类型为 SensorData
pub = rospy.Publisher('/sensor_topic', SensorData, queue_size=10)

# 创建消息实例
sensor_msg = SensorData()
sensor_msg.sensor_id = 123
sensor_msg.temperature = 25.2
sensor_msg.humidity = 58

# 发布消息
pub.publish(sensor_msg)

# 等待直到节点关闭
rospy.spin()

查看一下数据库内话题信息有没有正确存放

注意事项

  • 如果数据库和ROS不运行在一个主机上,mySQL需要先进行配置局域网访问

使用Python订阅ROS话题并将消息保存至SQL Server数据库

下面我们将一步步介绍如何使用Python订阅话题,并将接收到的消息保存到SQL Server数据库中。

第一步:安装必要的库

首先,确保已经安装了用于ROS的Python库和用于SQL Server的数据库适配器pyodbc

在ROS中,通常我们会使用rospy库来进行话题的订阅和发布。对于SQL Server,我们将使用pyodbc库来与数据库进行交互。

pip install pyodbc

第二步:创建SQL Server数据库和表

在SQL Server Management Studio (SSMS) 或者通过sqlcmd工具,执行以下SQL脚本来创建一个数据库和表以存储来自ROS话题的消息。

CREATE DATABASE RosData;

USE RosData;

CREATE TABLE SensorMessages (
    Id INT PRIMARY KEY IDENTITY(1,1),
    TopicName NVARCHAR(255) NOT NULL,
    MessageData NVARCHAR(MAX) NOT NULL,
    ReceivedDateTime DATETIME NOT NULL DEFAULT GETDATE()
);

此脚本创建了一个名为RosData的数据库和一个名为SensorMessages的表,表中包括自增的主键Id,话题名称TopicName,消息数据MessageData和接收时间戳ReceivedDateTime

第三步:编写Python脚本来订阅ROS话题

现在我们将创建一个Python脚本来订阅ROS话题,并将接收到的消息保存到SQL Server数据库中。

导入必要的模块

import rospy
from std_msgs.msg import String  # 假设我们订阅的话题类型为std_msgs/String
import pyodbc

设置SQL Server数据库连接

def create_database_connection():
    try:
        connection = pyodbc.connect(
            'DRIVER={ODBC Driver 17 for SQL Server};'
            'SERVER=your_server;'
            'DATABASE=RosData;'
            'UID=your_username;'
            'PWD=your_password;'
        )
        return connection
    except pyodbc.Error as e:
        print(f"Error connecting to SQL Server: {e}")
        return None

这段代码定义了一个函数create_database_connection,用于建立与SQL Server数据库的连接。

插入数据到数据库

def insert_message(connection, topic_name, message_data):
    cursor = connection.cursor()
    query = "INSERT INTO SensorMessages (TopicName, MessageData) VALUES (?, ?)"
    cursor.execute(query, topic_name, message_data)
    connection.commit()
    cursor.close()

insert_message函数将消息数据插入到SensorMessages表中。

定义ROS话题订阅回调函数

def ros_callback(message):
    topic_name = '/ros_topic_name'
    connection = create_database_connection()
    if connection is not None:
        insert_message(connection, topic_name, message.data)
        connection.close()

ros_callback函数会在接收到ROS话题消息时被调用,然后将消息保存到数据库。

设置ROS节点和订阅话题

def main():
    rospy.init_node('ros_sql_server_saver', anonymous=True)
    rospy.Subscriber('/ros_topic_name', String, ros_callback)
    rospy.spin()

main函数中,我们初始化了一个ROS节点,并订阅了名为/ros_topic_name的话题。每当收到消息时,ros_callback函数就会被调用。

第四步:运行Python脚本

保存上述脚本为 ros_sql_saver.py,确保SQL Server服务正在运行,并使用以下命令来运行脚本:

python ros_sql_saver.py

脚本将启动并订阅ROS话题。一旦接收到消息,它将调用ros_callback函数,并将接收到的数据存储到SQL Server数据库中。

运行结果

我们直接使用命令发送话题

读取一下数据库内容: