1. 概述


在《用C语言实现mosquitto MQTT订阅消息》


https://zhuanlan.zhihu.com/p/365190438


https://blog.csdn.net/chentuo2000/article/details/115747492


一文中我们用C语言实现了mosquitto MQTT同步订阅消息。mosquitto的同步函数是以阻塞方式工作的,也就是订阅程序一直等待接收消息,阻塞了其他程序的运行,效率很低。


下面我们用异步mosquitto的函数实现MQTT消息订阅,异步是非阻塞的方式,比同步方式性能更好


2. 修改例子subscribe


2.1 异步与同步程序的差异


  • 异步方式连接服务器的函数mosquitto_connect_async

该函数连接MQTT代理。这是一个非阻塞调用。如果使用mosquitto_connect_async,则客户端必须使用线程接口mosquitto_loop_start


注意:如果代码中直接使用mosquitto_loop循环,则必须使用同步mosquitto_connect函数连接代理。


例如用下面的循环代替mosquitto_loop_forever函数时


        while(running) {


                mosquitto_loop(mosq, -1, 1);


        }


  • 异步循环函数mosquitto_loop_start

此函数开启一个新线程,在线程里循环调用 mosquitto_loop。


而同步循环函数mosquitto_loop_forever在无限阻塞循环中调用mosquitto_loop。


mosquitto_loop是客户端的主循环函数,必须经常调用它以保持客户机和代理之间的通信正常工作。mosquitto_loop_forever和mosquitto_loop_start都是通过调用mosquitto_loop来实现的。


你也可以直接使用此函数,但不能在回调中调用它。


例如上面用循环mosquitto_loop代替mosquitto_loop_forever函数的例子。


2.2 重写C程序


  1. #include <mosquitto.h>
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <unistd.h>
  6. // 定义运行标志决定是否需要结束
  7. static int running = 1;
  8. // 当客户端从代理接收到CONNACK消息时调用回调
  9. void on_connect(struct mosquitto _mosq, void _obj, int reason_code)
  10. {
  11. int rc;
  12. printf(“on_connect: %s\n”, mosquitto_connack_string(reason_code));
  13. if(reason_code != 0){
  14. mosquitto_disconnect(mosq);
  15. }
  16. rc = mosquitto_subscribe(mosq, NULL, “example/temperature”, 1);
  17. if(rc != MOSQ_ERR_SUCCESS){
  18. fprintf(stderr, “Error subscribing: %s\n”, mosquitto_strerror(rc));
  19. mosquitto_disconnect(mosq);
  20. }
  21. }
  22. // 当代理在响应订阅发送SUBACK时调用回调
  23. void on_subscribe(struct mosquitto _mosq, void _obj, int mid, int qos_count, const int _granted_qos)
  24. {
  25. int i;
  26. bool have_subscription = false;
  27. for(i=0; i<qos_count; i++){
  28. printf(“on_subscribe: %d:granted qos = %d\n”, i, granted_qos[i]);
  29. if(granted_qos[i] <= 2){
  30. have_subscription = true;
  31. }
  32. }
  33. if(have_subscription == false){
  34. fprintf(stderr, “Error: All subscriptions rejected.\n”);
  35. mosquitto_disconnect(mosq);
  36. }
  37. }
  38. // 当客户端收到消息时调用回调该函数
  39. void on_message(struct mosquitto _mosq, void _obj, const struct mosquitto_message _msg)
  40. {
  41. // 打印有效载荷
  42. printf(“%s %d %s\n”, msg->topic, msg->qos, (char _)msg->payload);
  43. }
  44. // 当断开连接时调用回调该函数
  45. void on_disconnect(struct mosquitto _mosq, void _obj, int rc)
  46. {
  47. printf(“Call the function: my_disconnect_callback\n”);
  48. running = 0;
  49. }
  50. int main(int argc, char _argv[])
  51. {
  52. struct <span class=”hljs-title class_“>mosquitto *mosq;
  53. int rc;
  54. // 初始化mosquitto库
  55. mosquitto_lib_init();
  56. // 创建新的客户端实例。
  57. mosq = mosquitto_new(NULL, true, NULL);
  58. if(mosq == NULL){
  59. fprintf(stderr, “Error: Out of memory.\n”);
  60. return 1;
  61. }
  62. // 配置回调函数
  63. mosquitto_connect_callback_set(mosq, on_connect);
  64. mosquitto_subscribe_callback_set(mosq, on_subscribe);
  65. mosquitto_message_callback_set(mosq, on_message);
  66. mosquitto_disconnect_callback_set(mosq, on_disconnect);
  67. // 连接服务器
  68. mosquitto_username_pw_set(mosq, “ct”, “1qaz2wsx”);
  69. rc = mosquitto_connect_async (mosq, “raspberrypi”, 1883, 60);
  70. if(rc != MOSQ_ERR_SUCCESS){
  71. mosquitto_destroy(mosq);
  72. fprintf(stderr, “Error: %s\n”, mosquitto_strerror(rc));
  73. return 1;
  74. }
  75. // 异步循环
  76. rc = mosquitto_loop_start(mosq);
  77. if(rc != MOSQ_ERR_SUCCESS)
  78. {
  79. mosquitto_destroy(mosq);
  80. fprintf(stderr, “Error: %s\n”, mosquitto_strerror(rc));
  81. return 1;
  82. }
  83. // 开始循环
  84. printf(“Start!\n”);
  85. while(running)
  86. {
  87. sleep(1);
  88. }
  89. // 结束后的清理工作
  90. mosquitto_destroy(mosq);
  91. mosquitto_lib_cleanup();
  92. printf(“End!\n”);
  93. return 0;
  94. }



 


2.3 修改例子


  • 进入例子目录

cd mosquitto-2.0.9/examples/subscribe/



  • 备份一下原来的代码

cp basic-1.c basic-1.c-bak


  • 编辑basic-1.c

nano basic-1.c


用重写的C程序替换原来的程序。


3.3 编译


  • 编译

gcc -o basic-1 basic-1.c -lmosquitto



3.4 测试


  • 运行basic-1

./basic-1



从rc = mosquitto_subscribe(mosq, NULL, “example/temperature”, 1);可知,订阅的消息主题为example/temperature。


  • 再开一个窗口发布消息

mosquitto_pub -p 1883 -u ct -P xxxxxxxx -t example/temperature -m “26.6”



  • 订阅测试窗口收到消息


  • 用MQTT.fx远程测试

详细说明见《树莓派MQTT服务远程测试MQTT.fx


https://zhuanlan.zhihu.com/p/363373024


https://blog.csdn.net/chentuo2000/article/details/115539377



点击Publish:


订阅测试窗口收到消息



 


参考文档


  1. MQTT通信协议(mosquitto)发布订阅例子C语言实现
    https://blog.csdn.net/qq_33406883/article/details/107466430