POSIX Message Queues
POSIX message queues are a priority-driven IPC mechanism on Unix-like operating systems that are similar to the older System V messages. Multiple readers and writers can access a message queue and exchange arbitrary data simultanously. For each message, a priority is defined, resulting in a sorted message queue with the oldest message of the highest priority always being at the front.
Furthermore, a process can determine how many messages are currently on the
queue, the options and flags that have been set, and the number of processes
that are blocking to either send or receive. POSIX message queues exist in the
file system space and have a destinct path name. Therefore, the name of a
message queue always starts with a slash /
, and access to them is
managed using POSIX permissions.
Prerequisites
In contrast to System V messages, the POSIX equivalent is often not enabled by default. Depending on the operating system, a few configuration steps are involved to create the appropriate file system structure.
FreeBSD
On FreeBSD, make sure the kernel module
mqueuefs
is loaded, and a message queue file system is
mounted:
# kldload mqueuefs
# mkdir -p /mnt/mqueue
# mount -t mqueuefs null /mnt/mqueue
Add a kld_list
entry to /etc/rc.conf
to load the
kernel module at boot time:
$ sysrc kld_list+="mqueuefs"
You may furthermore want to add a mount point in /etc/fstab
:
# Device Mountpoint FStype Options Dump Pass
null /mnt/mqueue mqueuefs rw 0 0
We can get/set the system limits regarding the maximum number of messages and the maximum message size with sysctl(8):
$ sysctl kern.mqueue.maxmsg
kern.mqueue.maxmsg: 100
$ sysctl kern.mqueue.maxmsgsize
kern.mqueue.maxmsgsize: 16384
Linux
On Linux, the message queue is most likely available already. Otherwise, mount the file system simply with:
$ mkdir -p /dev/mqueue
$ mount -t mqueue none /dev/mqueue
fortran-unix
The library fortran-unix
provides ISO C binding interfaces to selected POSIX routines on Unix, including
POSIX message queues. Clone the repository and build the static library
libfortran-unix.a
:
$ git clone https://github.com/interkosmos/fortran-unix
$ cd fortran-unix/
$ make freebsd
On Linux, simply run make linux
instead. Import the Fortran module
unix
into your application and link against
libfortran-unix.a -lrt
.
Flags
The C functions mq_send()
and mq_receive()
are
blocking, unless flag O_NONBLOCK
is passed to
mq_open()
. In Fortran, the interfaces start with prefix
c_
. The permission in argument mode
is given in octal
format, using the BOZ literal constant o
.
integer(kind=c_mqd_t) :: mqds
integer :: rc
! Open MQ in non-blocking mode.
mqds = c_mq_open(name = '/fortran' // c_null_char, &
oflag = ior(O_RDWR, O_NONBLOCK), &
mode = int(o'0644', kind=c_mode_t), &
attr = c_null_ptr)
if (mqds < 0) stop
rc = c_mq_send(mqds, 'Hello, World!', 13_c_size_t, O_NONBLOCK)
rc = c_mq_close(mqds)
Example
The size of the buffer passed to c_mq_receive()
must be greater
than the maximum message size given by c_mq_getattr()
. We can set
the maximum message size with c_mq_setattr()
beforehand. On
FreeBSD, new message queues are limited to 10 messages by default, with a
maximum message size of 1024 bytes each, if not specified otherwise in the
passed c_mq_attr
derived type.
The example program will first create a new message queue
/fortran
, and then spawn two OpenMP threads for in-process
communication. Both connect to the newly created message queue, with one sending
and the other waiting to receive a message.
! mqueue.f90
program main
use, intrinsic :: omp_lib
use :: unix
implicit none
character(len=*), parameter :: MQ_NAME = '/fortran' ! New MQ in, e.g., `/mnt/mqueue/<name>`.
integer, parameter :: MQ_PERM = int(o'0644') ! MQ permissions (octal).
integer, parameter :: MQ_BFSZ = 16384 ! MQ input buffer size.
integer :: rc ! Return code.
integer(kind=c_mqd_t) :: mqds ! MQ file descriptor.
type(c_mq_attr) :: attr ! MQ attributes.
! Set OpenMP options.
call omp_set_dynamic(.false.)
call omp_set_num_threads(2)
! Unlink, if MQ already exists.
rc = c_mq_unlink(MQ_NAME // c_null_char)
! Create new message queue `/fortran`.
print '("[master] Creating new message queue ", a, " ...")', MQ_NAME
mqds = c_mq_open(name = MQ_NAME // c_null_char, &
oflag = ior(O_CREAT, O_RDWR), &
mode = int(MQ_PERM, kind=c_mode_t), &
attr = c_null_ptr)
if (mqds < 0) then
call c_perror('mq_open()' // c_null_char)
stop
end if
! Get message queue attributes.
if (c_mq_getattr(mqds, attr) < 0) &
call c_perror('mq_getattr()' // c_null_char)
print '("[master] MQ flags.............: ", i0)', attr%mq_flags
print '("[master] MQ max. # of messages: ", i0)', attr%mq_maxmsg
print '("[master] MQ max. message size.: ", i0)', attr%mq_msgsize
print '("[master] Current # of messages: ", i0)', attr%mq_curmsgs
!$omp parallel private(rc)
!$omp sections
!$omp section
! Sender thread.
rc = mqueue_send(name = MQ_NAME, &
perm = MQ_PERM, &
msg = 'Hello, World!')
if (rc < 0) call c_perror('mqueue_send()' // c_null_char)
!$omp section
! Receiver thread.
rc = mqueue_receive(name = MQ_NAME, &
perm = MQ_PERM)
if (rc < 0) call c_perror('mqueue_receive()' // c_null_char)
!$omp end sections
!$omp end parallel
! Close and unlink message queue.
print '("[master] Closing and removing message queue ...")'
if (c_mq_close(mqds) < 0) call c_perror('mq_close()' // c_null_char)
rc = c_mq_unlink(MQ_NAME // c_null_char)
contains
function mqueue_receive(name, perm) result (rc)
character(len=*), intent(in) :: name !! MQ name.
integer, intent(in) :: perm !! MQ permissions.
integer :: rc !! Return code.
character(len=MQ_BFSZ) :: buf !! Input buffer (> MQ max. message size).
integer(kind=c_mqd_t) :: mqds !! MQ file descriptor.
integer(kind=c_size_t) :: sz !! Bytes received.
rc = -1
print '("[receiver] Opening message queue ", a, " ...")', name
mqds = c_mq_open(name = name // c_null_char, &
oflag = O_RDONLY, &
mode = int(perm, kind=c_mode_t), &
attr = c_null_ptr)
if (mqds < 0) return
! Receive message. Make sure to clear the buffer.
print '("[receiver] Waiting for message ...")'
buf = ' '
sz = c_mq_receive(mqds, buf, len(buf, kind=c_size_t), 0)
if (sz < 0) return
if (sz == 0) then
print '("[receiver] No message received")'
else
print '("[receiver] Received message ", a)', trim(buf)
end if
! Close message queue.
print '("[receiver] Closing message queue ...")'
if (c_mq_close(mqds) < 0) return
rc = 0
end function mqueue_receive
function mqueue_send(name, perm, msg) result (rc)
character(len=*), intent(in) :: name !! MQ name.
integer, intent(in) :: perm !! MQ permissions.
character(len=*), intent(in) :: msg !! Message text.
integer :: rc !! Return code.
integer(kind=c_mqd_t) :: mqds !! MQ file descriptor.
rc = -1
print '("[sender] Opening message queue ", a, " ...")', name
mqds = c_mq_open(name = name // c_null_char, &
oflag = O_WRONLY, &
mode = int(perm, kind=c_mode_t), &
attr = c_null_ptr)
if (mqds < 0) return
! Send message.
print '("[sender] Sending message ", a, " ...")', trim(msg)
if (c_mq_send(mqds, msg, len(msg, kind=c_size_t), 1) < 0) return
! Close message queue.
print '("[sender] Closing message queue ...")'
if (c_mq_close(mqds) < 0) return
rc = 0
end function mqueue_send
end program main
Link the example program against OpenMP,
libfortran-unix.a
, and -lrt
, and then run the
executable mqueue
:
$ gfortran13 -fopenmp -o mqueue mqueue.f90 libfortran-unix.a -lrt
$ ./mqueue
[master] Creating new message queue /fortran ...
[master] MQ flags.............: 0
[master] MQ max. # of messages: 10
[master] MQ max. message size.: 1024
[master] Current # of messages: 0
[sender] Opening message queue /fortran ...
[sender] Sending message Hello, World! ...
[receiver] Opening message queue /fortran ...
[sender] Closing message queue ...
[receiver] Waiting for message ...
[receiver] Received message Hello, World!
[receiver] Closing message queue ...
[master] Closing and removing message queue ...
Message queues that have not been closed properly can be deleted with unlink(1):
$ unlink /mnt/mqueue/<name>
Fortran Libraries
- fortran-unix: Fortran 2008 interface bindings to POSIX message queues
References
- QNX C Library Reference: POSIX Message Queues
< UNIX System V Message Queues | [Index] | cURL > |