陈拓 2021/04/17-2021/04/19

1. 概述


在《用C语言实现mosquitto MQTT订阅消息》


https://zhuanlan.zhihu.com/p/365190438


https://blog.csdn.net/chentuo2000/article/details/115747492


和《用C语言实现mosquitto MQTT订阅消息(异步)》


https://zhuanlan.zhihu.com/p/365483724


https://blog.csdn.net/chentuo2000/article/details/115786111


两篇文章中我们已经通过MQTT订阅收到了客户端的消息,本文介绍将其中的数据存储到MySQL数据库中。


在《MQTT服务器Mosquitto 2.x编译安装配置》


https://zhuanlan.zhihu.com/p/365103802


https://blog.csdn.net/chentuo2000/article/details/115731687


一文中我们下载了mosquitto 2.x源码,其中有个写MySQL数据库的例子:



在GitHub上mosquitto官方也有这个例子:


https://github.com/eclipse/mosquitto/blob/master/examples/mysql_log/mysql_log.c



2. 查看mysql_log.c代码


cat mosquitto-2.0.9/examples/mysql_log/mysql_log.c



见“附录:mysql_log.c源代码”。


3. 安装MySQL数据库


见《树莓派安装使用数据库MariaDB (MySQL)》


https://blog.csdn.net/chentuo2000/article/details/108702880


4. 创建数据表


  • 查看MySQL进程

ps -ef | grep mysql



MySQL已启动。


  • 创建MySQL用户

在《树莓派安装使用数据库MariaDB (MySQL)》


https://blog.csdn.net/chentuo2000/article/details/108702880


一文中我们创建了用户ct。


  • 以用户ct登录MySQL

mysql -uct -pct



  • 查看数据库

show databases;



在《树莓派安装使用数据库MariaDB (MySQL)》


https://blog.csdn.net/chentuo2000/article/details/108702880


一文中我们创建了数据库smarthome。


  • 打开数据库

use smarthome;



  • 查看数据库smarthome中的表

show tables;



  • 查看表结构

describe temperature;



5. 修改C程序


  • 再开一个终端窗口
  • 进入mysql_log目录

cd mosquitto-2.0.9/examples/mysql_log/



  • 修改C程序mysql_log.c

有关C语言对MySQL的操作请看《树莓派采集温度数据并存入数据库(C语言版)》


https://blog.csdn.net/chentuo2000/article/details/108779497


一文。


针对我的环境进行修改:


  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <signal.h>
  6. #include <mosquitto.h>
  7. #include <mysql.h>
  8. #define db_host “localhost”
  9. #define db_username “ct”
  10. #define db_password “ct”
  11. #define db_database “smarthome”
  12. #define db_port 3306
  13. #define db_query “INSERT INTO temperature (deviceid, celsius_temp) VALUES (?,?)”
  14. #define mqtt_host “localhost”
  15. #define mqtt_port 1883
  16. static int run = 1;
  17. static MYSQL_STMT _stmt = NULL;
  18. void handle_signal(int s)
  19. {
  20. printf(“\n Capture sign no:%d\n”, s);
  21. run = 0;
  22. }
  23. void connect_callback(struct mosquitto _mosq, void _obj, int reason_code)
  24. {
  25. }
  26. void message_callback(struct mosquitto _mosq, void _obj, const struct mosquitto_message _message)
  27. {
  28. printf(“%s %d %s\n”, message->topic, message->qos, (char _)message->payload);
  29. MYSQL_BIND bind[2];
  30. memset(bind, 0, sizeof(bind));
  31. bind[0].buffer_type = MYSQL_TYPE_STRING;
  32. bind[0].buffer = message->topic;
  33. bind[0].buffer_length = strlen(message->topic);
  34. // Note: payload is normally a binary blob and could contains
  35. // NULL byte. This sample does not handle it and assume payload is a
  36. // string.
  37. bind[1].buffer_type = MYSQL_TYPE_STRING;
  38. bind[1].buffer = message->payload;
  39. bind[1].buffer_length = message->payloadlen;
  40. mysql_stmt_bind_param(stmt, bind);
  41. mysql_stmt_execute(stmt);
  42. }
  43. int main(int argc, char _argv[])
  44. {
  45. MYSQL *connection;
  46. my_bool reconnect = true;
  47. char clientid[24];
  48. struct <span class=”hljs-title class_“>mosquitto _mosq;
  49. int rc = 0;
  50. signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号
  51. signal(SIGTERM, handle_signal); // 程序结束(terminate)信号
  52. mysql_library_init(0, NULL, NULL);
  53. mosquitto_lib_init();
  54. connection = mysql_init(NULL);
  55. if(connection){
  56. mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);
  57. connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);
  58. if(connection){
  59. stmt = mysql_stmt_init(connection);
  60. mysql_stmt_prepare(stmt, db_query, strlen(db_query));
  61. memset(clientid, 0, 24);
  62. snprintf(clientid, 23, “mysql_log_%d”, getpid());
  63. mosq = mosquitto_new(clientid, true, connection);
  64. if(mosq){
  65. mosquitto_connect_callback_set(mosq, connect_callback);
  66. mosquitto_message_callback_set(mosq, message_callback);
  67. mosquitto_username_pw_set(mosq, “ct”, “1qaz2wsx”);
  68. rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60);
  69. mosquitto_subscribe(mosq, NULL, “#”, 0);
  70. while(run){
  71. rc = mosquitto_loop(mosq, -1, 1);
  72. if(run && rc){
  73. sleep(20);
  74. //mosquitto_reconnect(mosq);
  75. }
  76. }
  77. mosquitto_reconnect(mosq);
  78. }
  79. mysql_stmt_close(stmt);
  80. mysql_close(connection);
  81. }else{
  82. fprintf(stderr, “Error: Unable to connect to database.\n”);
  83. printf(“%s\n”, mysql_error(connection));
  84. rc = 1;
  85. }
  86. }else{
  87. fprintf(stderr, “Error: Unable to start mysql.\n”);
  88. rc = 1;
  89. }
  90. mysql_library_end();
  91. mosquitto_lib_cleanup();
  92. return rc;
  93. }



说明:


(1) 通配符#的使用


mosquitto_subscribe(mosq, NULL, “#”, 0);


在订阅中使用通配符可以收到多个主题的消息,通配符的详细用法可以看MQTT文档。


(2) CTRL+c正常结束程序


signal(SIGINT, handle_signal); // 捕捉终端CTRL+c产生的SIGINT信号


通过捕捉终端操作CTRL+c使程序能够正常退出以释放资源,而不是用CTRL+z强行终止程序的运行。


(3) 和《用C语言实现mosquitto MQTT订阅消息》


https://zhuanlan.zhihu.com/p/365190438


https://blog.csdn.net/chentuo2000/article/details/115747492


一文中调用无限循环的方法mosquitto_loop_forever(mosq, -1, 1);不同,这里采用等价的方法:


  1. while(run){
  2. rc = mosquitto_loop(mosq, -1, 1);
  3. if(run && rc){
  4. sleep(20);
  5. //mosquitto_reconnect(mosq);
  6. }
  7. }

这种方法更具灵活性。


  • 编辑mysql_log.c

nano mysql_log.c



6. 修改Makefile文件


  • 例子中的Makefile文件

cat Makefile



  • 修改为

  1. CFLAGS=-Wall -ggdb
  2. LDFLAGS=../../lib/libmosquitto.so.1 -lmariadbclient -lsqlite3
  3. .PHONY: all clean
  4. all : mosquitto_mysql_log
  5. mosquitto_mysql_log : mysql_log.o
  6. ${CC} $^ -o $@ ${LDFLAGS}
  7. mysql_log.o : mysql_log.c
  8. ${CC} -c $^ -o $@ ${CFLAGS} -I../../include -I/usr/include/mariadb
  9. clean :
  10. -rm -f _.o mosquitto_mysql_log



根据我的MySQL和mosquitto的头文件和库文件的位置修改。


  • 头文件和库文件的位置

(1) MySQL头文件


ls -l /usr/include/mariadb/mysql.h



(2) MySQL库文件


ls -l /usr/lib/arm-linux-gnueabihf/libmariadbclient.so



ls -l /usr/lib/arm-linux-gnueabihf/libsqlite3.so



libmariadbclient.so和libsqlite3.so在默认库文件的目录/usr/lib中,不需要指出路径。


(3) mosquitto头文件


ls -l ../../include/mosquitto.h



(4) mosquitto库文件


ls -l ../../lib/libmosquitto.so.1



7. 编译


make




8. 本地测试


  • 运行./mosquitto_mysql_log


订阅subscribe程序进入循环等待接收消息。


  • 在另一个终端窗口中发布publish消息


订阅窗口收到消息,显示topic、qos、payload,并将topic和payload存入数据库,对用表temperature的deviceid和celsius_temp字段。



按CTRL+c可以正常推出程序,这样可以完成资源释放和回收工作。按CTRL+z可以强行中断程序运行,但跳过了资源释放语句的执行。


重新运行./mosquitto_mysql_log


  • 再发布消息


订阅窗口收到消息:



可以看到,在mosquitto_subscribe(mosq, NULL, “#”, 0);语句中使用了通配符后订阅程序可以接收不同主题的消息。在这里我们可以用主题temperature001和temperature002表示两个温度传感器。


  • 查看数据库

在数据库窗口查看刚才收到并保存的数据:


select _ from temperature where deviceid like ‘temperature%’;



9. 远程测试


  • 用MQTT.fx测试

详细说明见《树莓派MQTT服务远程测试MQTT.fx》


https://zhuanlan.zhihu.com/p/363373024


https://blog.csdn.net/chentuo2000/article/details/115539377



点击Publish


  • 订阅测试窗口收到消息


  • 查看数据库


 


附录:mysql_log.c源代码


  1. #include <signal.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. #ifndef WIN32
  5. # include <unistd.h>
  6. #else
  7. # include <process.h>
  8. # define snprintf sprintf_s
  9. #endif
  10. #include <mosquitto.h>
  11. #include <mysql/mysql.h>
  12. #define db_host “localhost”
  13. #define db_username “mqtt_log”
  14. #define db_password “password”
  15. #define db_database “mqtt_log”
  16. #define db_port 3306
  17. #define db_query “INSERT INTO mqtt_log (topic, payload) VALUES (?,?)”
  18. #define mqtt_host “localhost”
  19. #define mqtt_port 1883
  20. static int run = 1;
  21. static MYSQL_STMT _stmt = NULL;
  22. void handle_signal(int s)
  23. {
  24. run = 0;
  25. }
  26. void connect_callback(struct mosquitto _mosq, void _obj, int result)
  27. {
  28. }
  29. void message_callback(struct mosquitto _mosq, void _obj, const struct mosquitto_message _message)
  30. {
  31. MYSQL_BIND bind[2];
  32. memset(bind, 0, sizeof(bind));
  33. bind[0].buffer_type = MYSQL_TYPE_STRING;
  34. bind[0].buffer = message->topic;
  35. bind[0].buffer_length = strlen(message->topic);
  36. // Note: payload is normally a binary blob and could contains
  37. // NULL byte. This sample does not handle it and assume payload is a
  38. // string.
  39. bind[1].buffer_type = MYSQL_TYPE_STRING;
  40. bind[1].buffer = message->payload;
  41. bind[1].buffer_length = message->payloadlen;
  42. mysql_stmt_bind_param(stmt, bind);
  43. mysql_stmt_execute(stmt);
  44. }
  45. int main(int argc, char _argv[])
  46. {
  47. MYSQL *connection;
  48. my_bool reconnect = true;
  49. char clientid[24];
  50. struct <span class=”hljs-title class_“>mosquitto *mosq;
  51. int rc = 0;
  52. signal(SIGINT, handle_signal);
  53. signal(SIGTERM, handle_signal);
  54. mysql_library_init(0, NULL, NULL);
  55. mosquitto_lib_init();
  56. connection = mysql_init(NULL);
  57. if(connection){
  58. mysql_options(connection, MYSQL_OPT_RECONNECT, &reconnect);
  59. connection = mysql_real_connect(connection, db_host, db_username, db_password, db_database, db_port, NULL, 0);
  60. if(connection){
  61. stmt = mysql_stmt_init(connection);
  62. mysql_stmt_prepare(stmt, db_query, strlen(db_query));
  63. memset(clientid, 0, 24);
  64. snprintf(clientid, 23, “mysql_log_%d”, getpid());
  65. mosq = mosquitto_new(clientid, true, connection);
  66. if(mosq){
  67. mosquitto_connect_callback_set(mosq, connect_callback);
  68. mosquitto_message_callback_set(mosq, message_callback);
  69. rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60);
  70. mosquitto_subscribe(mosq, NULL, “#”, 0);
  71. while(run){
  72. rc = mosquitto_loop(mosq, -1, 1);
  73. if(run && rc){
  74. sleep(20);
  75. mosquitto_reconnect(mosq);
  76. }
  77. }
  78. mosquitto_destroy(mosq);
  79. }
  80. mysql_stmt_close(stmt);
  81. mysql_close(connection);
  82. }else{
  83. fprintf(stderr, “Error: Unable to connect to database.\n”);
  84. printf(“%s\n”, mysql_error(connection));
  85. rc = 1;
  86. }
  87. }else{
  88. fprintf(stderr, “Error: Unable to start mysql.\n”);
  89. rc = 1;
  90. }
  91. mysql_library_end();
  92. mosquitto_lib_cleanup();
  93. return rc;
  94. }