0%

POSIX消息队列

POSIX消息队列允许进程以消息的形式交换数据,与System V消息队列不同,但提供了类似的功能。

使用条件

若想使用POSIX消息队列,kernel需要打开配置CONFIG_POSIX_MQUEUE(默认情况是打开的)。编译程序时需要加上-lrt参数。

使用方法

打开或创建消息队列

使用mq_open()创建或打开一个消息队列,这个函数会返回一个消息队列描述符(mdq_t),之后的函数调用会使用到这个描述符。返回-1表示打开失败,错误原因可以通过errno获取。

1
2
3
4
5
6
#include <fcntl.h>           /* For O_* constants */
#include <sys/stat.h> /* For mode constants */
#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag);
mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);

消息队列使用参数name标识,其格式为/somenamename是一个以空字符结尾、长度不大于NAME_MAX(i.e., 255)的字符串,以斜线/开头,后续是一个或多个不是斜线的字符。若两个进程需要操作同一个消息队列,需要使用相同的name

oflag参数用于控制函数的动作,与open()函数的标志位非常类似。

  • O_RDONLY 只从消息队列中接收数据
  • O_WRONLY 只发送数据到消息队列中
  • O_RDWR 发送和接收数据
  • O_CLOEXEC
  • O_CREAT 如果消息队列不存在,则创建
  • O_EXCL 如果指定了O_CREATE且消息队列存在,则返回错误 EEXIST
  • O_NONBLOCK

O_RDONLYO_WRONLYO_RDWR 这三个只能指定一个,且必须指定一个,其余标志可以指定0个或多个,通过位或运算拼接。如果指定了O_CREATE,还需要指定modeattr参数。

mode参数用于指定消息队列的权限。

attr参数是struct mq_attr结构体指针。mq_open()只会使用mq_maxmsgmq_msgsize,忽略其他的值。如果attr是NULL,则会使用默认值。后面会介绍默认值。

mq_maxmsg指消息队列最多能存储多少个消息。mq_msgsize指每个消息的最大大小。

1
2
3
4
5
6
7
struct mq_attr {
long mq_flags; /* Flags (ignored for mq_open()) */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue
(ignored for mq_open()) */
};

消息队列描述符本质上是一个文件描述符。所以文件描述符的特性也适用于消息队列描述符。例如fork()后,子进程会继承父进程的消息队列描述符,且指向同一个消息队列。

发送消息

使用mq_send()发送消息到消息队列。参数mqdes是消息队列描述符,表示消息将要发送到的队列。msg_ptr是指向消息的指针,其长度是msg_len,长度必须小于或等于消息队列的mq_msqsize属性。msg_prio是一个非负整数,用于表示消息的优先级。消息在队列中按照优先级降序排序,同优先级的消息放在旧消息的后面。

如果队列已经满了(消息数量等于队列的mq_maxmsg属性),mq_send()会一直阻塞直到有足够的空间放入消息,或被信号中断。如果消息队列使能了O_NONBLOCK标志,调用会立即返回错误EAGAIN

1
2
3
4
#include <mqueue.h>

int mq_send(mqd_t mqdes, const char msg_ptr[.msg_len],
size_t msg_len, unsigned int msg_prio);

mq_timedsend()的行为类似于mq_send()。消息队列满了并且O_NONBLOCK标志没有使能的情况下,abs_timeout表示调用会阻塞多久。值是从1970-01-01 00:00:00 +0000 (UTC)开始的绝对时间,以秒和纳秒为单位。如果消息队列已满并且调用时已经超时,会立即返回。

1
2
3
4
5
6
#include <time.h>
#include <mqueue.h>

int mq_timedsend(mqd_t mqdes, const char msg_ptr[.msg_len],
size_t msg_len, unsigned int msg_prio,
const struct timespec *abs_timeout);

成功发送消息,mq_send()mq_timedsend()返回0。如果有错误则返回-1errno表示错误。

每个消息都会关联一个优先级,并且高优先级消息会被优先发送给接收进程。消息优先级的范围是从 0(低)到sysconf(_SC_MQ_PRIO_MAX)-1(高)。在Linux操作系统,sysconf(_SC_MQ_PRIO_MAX) 返回 32768。POSIX.1只要求实现0~31。

接收消息

使用mq_receive()从消息队列接收消息。mq_receive()移除消息队列mqdes中优先级最高、时间最久的消息,并将消息放到msg_ptr指向的缓存区,msg_len指定了缓冲区的长度,其值必须大于或等于消息队列的mq_msgsize属性。如果msg_prio不是NULL,那么用于返回接收到消息的优先级。

如果队列是空的,默认情况下,mq_receive()会一直阻塞直到有消息可用,或被信号中断。如果消息队列使能了O_NONBLOCK标志,调用会立即返回错误EAGAIN

1
2
3
4
#include <mqueue.h>

ssize_t mq_receive(mqd_t mqdes, char msg_ptr[.msg_len],
size_t msg_len, unsigned int *msg_prio);

mq_timedreceive()的行为类似于mq_receive()。消息队列为空并且O_NONBLOCK标志没有使能的情况下,abs_timeout表示调用会阻塞多久。值是从1970-01-01 00:00:00 +0000 (UTC)开始的绝对时间,以秒和纳秒为单位。如果消息队列为空并且调用时已经超时,会立即返回。

1
2
3
4
5
6
#include <time.h>
#include <mqueue.h>

ssize_t mq_timedreceive(mqd_t mqdes, char *restrict msg_ptr[.msg_len],
size_t msg_len, unsigned int *restrict msg_prio,
const struct timespec *restrict abs_timeout);

成功接收消息,mq_receive()mq_timedreceive()返回0。如果有错误则返回-1errno表示错误。

关闭消息队列

当进程不再使用消息队列时,可以使用mq_close()关闭消息队列。

如果调用进程为消息队列注册了通知请求,那么通知请求会被移除。同时其他的进程就可以注册通知请求了。

1
2
3
#include <mqueue.h>

int mq_close(mqd_t mqdes);

mq_close()只是为当前进程关闭消息队列,其他进程的消息队列可能还处于打开的状态。

删除消息队列

当不需要消息队列的时候,可以使用mq_unlink()移除指定的消息队列name。消息队列名字会立即移除。消息队列本身会在所有引用队列的描述符关闭后销毁。

1
2
3
#include <mqueue.h>

int mq_unlink(const char *name);

如果只是用mq_close关闭消息队列,而没有调用mq_unlink()删除消息队列,那么消息队列会一直存在系统中,直到系统关闭。

消息队列属性

使用mq_getattr()获取消息队列属性,使用mq_setatrr()设置消息队列属性。

1
2
3
4
5
#include <mqueue.h>

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, const struct mq_attr *restrict newattr,
struct mq_attr *restrict oldattr);

消息队列属性是一个struct mq_attr结构体。mq_flags字段包含打开消息队列时关联的标志,这个字段只会出现标志O_NONBLOCK

mq_maxmsgmq_msgsize字段可以在使用mq_open()打开消息队列时设置。

mq_maxmsg字段是消息数量的上限。mq_msgsize字段是消息大小的的上限。这两个字段的值必须大于0。在使用mq_open()打开消息队列的时候会设置这两个字段。

mq_curmsgs字段包含消息队列的消息数量。

1
2
3
4
5
6
struct mq_attr {
long mq_flags; /* Flags: 0 or O_NONBLOCK */
long mq_maxmsg; /* Max. # of messages on queue */
long mq_msgsize; /* Max. message size (bytes) */
long mq_curmsgs; /* # of messages currently in queue */
};

mq_setattr()使用newattr修改消息队列的属性。但只有mq_flags字段的O_NONBLOCK标志可以修改,其他字段会被忽略。如果oldattr不是NULL,用于返回消息队列修改之前的属性。

异步通知

TODO

https://www.man7.org/linux/man-pages/man3/mq_notify.3.html

库接口和系统调用

Library interface System call
mq_close(3) close(2)
mq_getattr(3) mq_getsetattr(2)
mq_notify(3) mq_notify(2)
mq_open(3) mq_open(2)
mq_receive(3) mq_timedreceive(2)
mq_send(3) mq_timedsend(2)
mq_setattr(3) mq_getsetattr(2)
mq_timedreceive(3) mq_timedreceive(2)
mq_timedsend(3) mq_timedsend(2)
mq_unlink(3) mq_unlink(2)

/proc接口

以下接口可用于限制POSIX消息队列使用的内存总量,设置新消息队列的默认属性。

/proc/sys/fs/mqueue/msg_default

mq_maxmsg属性的默认值。用mq_open()创建队列并且attr参数是NULL时,新队列的mq_maxmsg属性会使用这个文件定义的值。这个文件的默认值是10。

/proc/sys/fs/mqueue/msg_default

TODO

/proc/sys/fs/mqueue/msg_max

TODO

/proc/sys/fs/mqueue/msgsize_default

TODO

/proc/sys/fs/mqueue/msgsize_max

TODO

/proc/sys/fs/mqueue/queues_max

TODO

消息队列文件系统

在Linux,消息队列创建在虚拟文件系统。可以使用如下命令挂载。

1
2
mkdir /dev/mqueue
mount -t mqueue none /dev/mqueue

挂载文件系统后,可以使用操作文件的命令来查看和操作消息队列。例如lsrm。这个文件夹的文件包含队列的信息。

1
2
$ cat /dev/mqueue/mymq
QSIZE:129 NOTIFY:2 SIGNO:0 NOTIFY_PID:8260

字段说明如下:

  • QSIZE:队列中所有消息的总大小,单位字节。
  • NOTIFY_PID:如果非0,则此进程使用mq_notify()注册了异步通知。其余字段描述通知是如何发生的。
  • NOTIFY:通知的方法。0:SIGEV_SIGNAL;1:SIGEV_NONE;2:SIGEV_THREAD。
  • SIGNO:SIGEV_SIGNAL使用的信号编号。

POSIX消息队列与SystemV消息队列对比

TODO

测试程序

mqueue.c

查看消息队列的系统参数

./mqueue server启动服务端,会打印与消息队列相关的系统参数。Ctrl-C退出,cat /dev/mqueue/test_posix_message_queue可以看到消息队列的一些数据。可以看到现在消息队列占用的空间是0。

1
2
3
4
5
6
7
8
9
10
$ ./mqueue server
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msg_default: 10
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msg_max: 10
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msgsize_default: 8192
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msgsize_max: 8192
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/queues_max: 256
[INFO][show_mqueue_info:0206] SC_MQ_PRIO_MAX: 32768
^C
$ cat /dev/mqueue/test_posix_message_queue
QSIZE:0 NOTIFY:0 SIGNO:0 NOTIFY_PID:0

发送阻塞

在服务器没有启动的情况下,执行命令./mqueue client 1 2 3 4 5 6 7 8 9 10 11发送11个消息。可以看到,消息11没有发送成功,说明消息队列满了。为了测试消息优先级,第一个消息优先级是1,第二个消息优先级是2,依此类推。

1
2
3
4
5
6
7
8
9
10
11
12
$ ./mqueue client 1 2 3 4 5 6 7 8 9 10 11
[INFO][client:0129] mqd = 3
[INFO][client:0140] 发送成功:1
[INFO][client:0140] 发送成功:2
[INFO][client:0140] 发送成功:3
[INFO][client:0140] 发送成功:4
[INFO][client:0140] 发送成功:5
[INFO][client:0140] 发送成功:6
[INFO][client:0140] 发送成功:7
[INFO][client:0140] 发送成功:8
[INFO][client:0140] 发送成功:9
[INFO][client:0140] 发送成功:10

在第二个终端执行cat /dev/mqueue/test_posix_message_queue,可以看到消息队列占用了11个字节的空间,符合预期。

1
2
$ cat /dev/mqueue/test_posix_message_queue
QSIZE:11 NOTIFY:0 SIGNO:0 NOTIFY_PID:0

消息优先级

基于上一个实验,在第二个终端执行./mqueue server。消息10最先接收到,消息队列有个空位了,所以消息11发送成功。这时消息11的优先级最高,所以第二个收到的消息是11,然后是消息9,依次类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 第一个终端
[INFO][client:0140] 发送成功:9
[INFO][client:0140] 发送成功:10
[INFO][client:0140] 发送成功:11
$

# 第二个终端
$ ./mqueue server
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msg_default: 10
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msg_max: 10
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msgsize_default: 8192
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/msgsize_max: 8192
[INFO][show_mqueue_info:0200] /proc/sys/fs/mqueue/queues_max: 256
[INFO][show_mqueue_info:0206] SC_MQ_PRIO_MAX: 32768
[INFO][server:0099] 第 1 个消息
[INFO][server:0100] 收到的消息:10
[INFO][server:0101] 消息长度:2
[INFO][server:0102] 优先级:10

[INFO][server:0099] 第 2 个消息
[INFO][server:0100] 收到的消息:11
[INFO][server:0101] 消息长度:2
[INFO][server:0102] 优先级:11

[INFO][server:0099] 第 3 个消息
[INFO][server:0100] 收到的消息:9
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:9

[INFO][server:0099] 第 4 个消息
[INFO][server:0100] 收到的消息:8
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:8

[INFO][server:0099] 第 5 个消息
[INFO][server:0100] 收到的消息:7
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:7

[INFO][server:0099] 第 6 个消息
[INFO][server:0100] 收到的消息:6
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:6

[INFO][server:0099] 第 7 个消息
[INFO][server:0100] 收到的消息:5
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:5

[INFO][server:0099] 第 8 个消息
[INFO][server:0100] 收到的消息:4
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:4

[INFO][server:0099] 第 9 个消息
[INFO][server:0100] 收到的消息:3
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:3

[INFO][server:0099] 第 10 个消息
[INFO][server:0100] 收到的消息:2
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:2

[INFO][server:0099] 第 11 个消息
[INFO][server:0100] 收到的消息:1
[INFO][server:0101] 消息长度:1
[INFO][server:0102] 优先级:1

参考