在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。

应用:线程间通信/进程间通信。

Linux系统中提供了两种不同接口的消息队列:

  • POSIX消息队列。POSIX为可移植的操作系统接口。
  • System V消息队列。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。

其中,POSIX消息队列可移植性较强,使用较广。

Linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。

本文介绍POSIX消息队列应用于线程间通信。

头文件:

#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

编译链接需要加上 -lrt 链接。

Linux内核提供了一系列函数来使用消息队列:

/**
 * @brief 创建消息队列实例
 *
 * Detailed function description
 *
 * @param[in] name: 消息队列名称
 * @param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列
                    - O_CREAT: 创建一个消息队列
                    - O_EXCL: 检查消息队列是否存在,一般与O_CREAT一起使用
                    - O_CREAT|O_EXCL: 消息队列不存在则创建,已存在返回NULL
                    - O_NONBLOCK: 非阻塞模式打开,消息队列不存在返回NULL
                    - O_RDONLY: 只读模式打开
                    - O_WRONLY: 只写模式打开
                    - O_RDWR: 读写模式打开
 * @param[in] mode:访问权限
 * @param[in] attr:消息队列属性地址
 *
 * @return 成功返回消息队列描述符,失败返回-1,错误码存于error中
 */
mqd_t mq_open(const char *name, int oflag,  mode_t mode, struct mq_attr *attr);

/**
 * @brief 无限阻塞方式接收消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:消息体缓冲区地址
 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
 * @param[in] msg_prio:消息优先级
 *
 * @return 成功返回消息长度,失败返回-1,错误码存于error中
 */
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

/**
 * @brief 指定超时时间阻塞方式接收消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:消息体缓冲区地址
 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
 * @param[in] msg_prio:消息优先级
 * @param[in] abs_timeout:超时时间
 *
 * @return 成功返回消息长度,失败返回-1,错误码存于error中
 */
mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout);

/**
 * @brief 无限阻塞方式发送消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:待发送消息体缓冲区地址
 * @param[in] msg_len:消息体长度
 * @param[in] msg_prio:消息优先级
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

/**
 * @brief 指定超时时间阻塞方式发送消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:待发送消息体缓冲区地址
 * @param[in] msg_len:消息体长度
 * @param[in] msg_prio:消息优先级
 * @param[in] abs_timeout:超时时间
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);

/**
 * @brief 关闭消息队列
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_close(mqd_t mqdes);

/**
 * @brief 分离消息队列
 *
 * Detailed function description
 *
 * @param[in] name: 消息队列名称
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_unlink(const char *name);

例子:线程1不断给线程2发送字符串数据。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h>        /* For mode constants */
#include <mqueue.h>

#define MQ_MSG_MAX_SIZE    512	 ///< 最大消息长度 
#define MQ_MSG_MAX_ITEM    5	 ///< 最大消息数目

static pthread_t s_thread1_id;
static pthread_t s_thread2_id;
static unsigned char s_thread1_running = 0;
static unsigned char s_thread2_running = 0;

static mqd_t s_mq;
static char send_msg[10] = "hello";

void *thread1_fun(void *arg)
{
    int ret = 0;

    s_thread1_running = 1;
    while (s_thread1_running)  
    {
		ret = mq_send(s_mq, send_msg, sizeof(send_msg), 0);
		if (ret < 0)
		{
        	perror("mq_send error");
		}
        printf("send msg = %s\n", send_msg);
        usleep(100 * 1000);
    }
    
    pthread_exit(NULL);
}

void *thread2_fun(void *arg)
{
	char  buf[MQ_MSG_MAX_SIZE];
	int recv_size = 0;

    s_thread2_running = 1;
    while (s_thread2_running)
    {
		recv_size = mq_receive(s_mq, &buf[0], sizeof(buf), NULL);
		if (-1 != recv_size)
		{
			printf("receive msg = %s\n", buf);
		}
		else
		{
			perror("mq_receive error");
			break;
		}

        usleep(100 * 1000);
    }
    
    pthread_exit(NULL);
}

int main(void)
{
    int ret = 0;
    struct mq_attr attr;

    ///< 创建消息队列
    memset(&attr, 0, sizeof(attr));
    attr.mq_maxmsg = MQ_MSG_MAX_ITEM;
    attr.mq_msgsize = MQ_MSG_MAX_SIZE;
    attr.mq_flags = 0;
    s_mq = mq_open("/mq", O_CREAT|O_RDWR, 0777, &attr);
	if(-1 == s_mq)
    {
        perror("mq_open error");
        return -1;
    }

    ///< 创建线程1
    ret = pthread_create(&s_thread1_id, NULL, thread1_fun, NULL);
    if (ret != 0)
    {
        printf("thread1_create error!\n");
        exit(EXIT_FAILURE);
    }
    ret = pthread_detach(s_thread1_id);
    if (ret != 0)
    {
        printf("s_thread1_id error!\n");
        exit(EXIT_FAILURE);
    }

    ///< 创建线程2
    ret = pthread_create(&s_thread2_id, NULL, thread2_fun, NULL);
    if (ret != 0)
    {
        printf("thread2_create error!\n");
        exit(EXIT_FAILURE);
    }
    ret = pthread_detach(s_thread2_id);
    if (ret != 0)
    {
        printf("s_thread2_id error!\n");
        exit(EXIT_FAILURE);
    }

    while (1)
    {
        sleep(1);
    }

    return 0;
}

编译、运行: