POSIX Message Queues

POSIX message queues are a priority-driven IPC mechanism on Unix-like operating systems that are similar to the older System V messages. Multiple readers and writers can access a message queue and exchange arbitrary data simultanously. For each message, a priority is defined, resulting in a sorted message queue with the oldest message of the highest priority always being at the front.

Furthermore, a process can determine how many messages are currently on the queue, the options and flags that have been set, and the number of processes that are blocking to either send or receive. POSIX message queues exist in the file system space and have a destinct path name. Therefore, the name of a message queue always starts with a slash /, and access to them is managed using POSIX permissions.

Prerequisites

In contrast to System V messages, the POSIX equivalent is often not enabled by default. Depending on the operating system, a few configuration steps are involved to create the appropriate file system structure.

FreeBSD

On FreeBSD, make sure the kernel module mqueuefs is loaded, and a message queue file system is mounted:

# kldload mqueuefs
# mkdir -p /mnt/mqueue
# mount -t mqueuefs null /mnt/mqueue

Add a kld_list entry to /etc/rc.conf to load the kernel module at boot time:

$ sysrc kld_list+="mqueuefs"

You may furthermore want to add a mount point in /etc/fstab:

# Device    Mountpoint  FStype      Options     Dump    Pass
null        /mnt/mqueue mqueuefs    rw          0       0

We can get/set the system limits regarding the maximum number of messages and the maximum message size with sysctl(8):

$ sysctl kern.mqueue.maxmsg
kern.mqueue.maxmsg: 100
$ sysctl kern.mqueue.maxmsgsize
kern.mqueue.maxmsgsize: 16384

Linux

On Linux, the message queue is most likely available already. Otherwise, mount the file system simply with:

$ mkdir -p /dev/mqueue
$ mount -t mqueue none /dev/mqueue

fortran-unix

The library fortran-unix provides ISO C binding interfaces to selected POSIX routines on Unix, including POSIX message queues. Clone the repository and build the static library libfortran-unix.a:

$ git clone https://github.com/interkosmos/fortran-unix
$ cd fortran-unix/
$ make freebsd

On Linux, simply run make linux instead. Import the Fortran module unix into your application and link against libfortran-unix.a -lrt.

Flags

The C functions mq_send() and mq_receive() are blocking, unless flag O_NONBLOCK is passed to mq_open(). In Fortran, the interfaces start with prefix c_. The permission in argument mode is given in octal format, using the BOZ literal constant o.

integer(kind=c_mqd_t) :: mqds
integer               :: rc

! Open MQ in non-blocking mode.
mqds = c_mq_open(name  = '/fortran' // c_null_char, &
                 oflag = ior(O_RDWR, O_NONBLOCK), &
                 mode  = int(o'0644', kind=c_mode_t), &
                 attr  = c_null_ptr)
if (mqds < 0) stop
rc = c_mq_send(mqds, 'Hello, World!', 13_c_size_t, O_NONBLOCK)
rc = c_mq_close(mqds)

Example

The size of the buffer passed to c_mq_receive() must be greater than the maximum message size given by c_mq_getattr(). We can set the maximum message size with c_mq_setattr() beforehand. On FreeBSD, new message queues are limited to 10 messages by default, with a maximum message size of 1024 bytes each, if not specified otherwise in the passed c_mq_attr derived type.

The example program will first create a new message queue /fortran, and then spawn two OpenMP threads for in-process communication. Both connect to the newly created message queue, with one sending and the other waiting to receive a message.

! mqueue.f90
 program main
    use, intrinsic :: omp_lib
    use :: unix
    implicit none
    character(len=*), parameter :: MQ_NAME = '/fortran'   ! New MQ in, e.g., `/mnt/mqueue/<name>`.
    integer,          parameter :: MQ_PERM = int(o'0644') ! MQ permissions (octal).
    integer,          parameter :: MQ_BFSZ = 16384        ! MQ input buffer size.

    integer               :: rc   ! Return code.
    integer(kind=c_mqd_t) :: mqds ! MQ file descriptor.
    type(c_mq_attr)       :: attr ! MQ attributes.

    ! Set OpenMP options.
    call omp_set_dynamic(.false.)
    call omp_set_num_threads(2)

    ! Unlink, if MQ already exists.
    rc = c_mq_unlink(MQ_NAME // c_null_char)

    ! Create new message queue `/fortran`.
    print '("[master] Creating new message queue ", a, " ...")', MQ_NAME
    mqds = c_mq_open(name  = MQ_NAME // c_null_char, &
                     oflag = ior(O_CREAT, O_RDWR), &
                     mode  = int(MQ_PERM, kind=c_mode_t), &
                     attr  = c_null_ptr)

    if (mqds < 0) then
        call c_perror('mq_open()' // c_null_char)
        stop
    end if

    ! Get message queue attributes.
    if (c_mq_getattr(mqds, attr) < 0) &
        call c_perror('mq_getattr()' // c_null_char)

    print '("[master] MQ flags.............: ", i0)', attr%mq_flags
    print '("[master] MQ max. # of messages: ", i0)', attr%mq_maxmsg
    print '("[master] MQ max. message size.: ", i0)', attr%mq_msgsize
    print '("[master] Current # of messages: ", i0)', attr%mq_curmsgs

    !$omp parallel private(rc)
    !$omp sections
    !$omp section

        ! Sender thread.
        rc = mqueue_send(name = MQ_NAME, &
                         perm = MQ_PERM, &
                         msg  = 'Hello, World!')
        if (rc < 0) call c_perror('mqueue_send()' // c_null_char)

    !$omp section

        ! Receiver thread.
        rc = mqueue_receive(name = MQ_NAME, &
                            perm = MQ_PERM)
        if (rc < 0) call c_perror('mqueue_receive()' // c_null_char)

    !$omp end sections
    !$omp end parallel

    ! Close and unlink message queue.
    print '("[master] Closing and removing message queue ...")'
    if (c_mq_close(mqds) < 0) call c_perror('mq_close()' // c_null_char)
    rc = c_mq_unlink(MQ_NAME // c_null_char)
contains
    function mqueue_receive(name, perm) result (rc)
        character(len=*), intent(in) :: name !! MQ name.
        integer,          intent(in) :: perm !! MQ permissions.
        integer                      :: rc   !! Return code.

        character(len=MQ_BFSZ) :: buf  !! Input buffer (> MQ max. message size).
        integer(kind=c_mqd_t)  :: mqds !! MQ file descriptor.
        integer(kind=c_size_t) :: sz   !! Bytes received.

        rc = -1

        print '("[receiver] Opening message queue ", a, " ...")', name
        mqds = c_mq_open(name  = name // c_null_char, &
                         oflag = O_RDONLY, &
                         mode  = int(perm, kind=c_mode_t), &
                         attr  = c_null_ptr)
        if (mqds < 0) return

        ! Receive message. Make sure to clear the buffer.
        print '("[receiver] Waiting for message ...")'
        buf = ' '
        sz  = c_mq_receive(mqds, buf, len(buf, kind=c_size_t), 0)

        if (sz < 0) return

        if (sz == 0) then
            print '("[receiver] No message received")'
        else
            print '("[receiver] Received message ", a)', trim(buf)
        end if

        ! Close message queue.
        print '("[receiver] Closing message queue ...")'
        if (c_mq_close(mqds) < 0) return

        rc = 0
    end function mqueue_receive

    function mqueue_send(name, perm, msg) result (rc)
        character(len=*), intent(in) :: name !! MQ name.
        integer,          intent(in) :: perm !! MQ permissions.
        character(len=*), intent(in) :: msg  !! Message text.
        integer                      :: rc   !! Return code.

        integer(kind=c_mqd_t) :: mqds !! MQ file descriptor.

        rc = -1

        print '("[sender] Opening message queue ", a, " ...")', name
        mqds = c_mq_open(name  = name // c_null_char, &
                         oflag = O_WRONLY, &
                         mode  = int(perm, kind=c_mode_t), &
                         attr  = c_null_ptr)
        if (mqds < 0) return

        ! Send message.
        print '("[sender] Sending message ", a, " ...")', trim(msg)
        if (c_mq_send(mqds, msg, len(msg, kind=c_size_t), 1) < 0) return

        ! Close message queue.
        print '("[sender] Closing message queue ...")'
        if (c_mq_close(mqds) < 0) return

        rc = 0
    end function mqueue_send
end program main

Link the example program against OpenMP, libfortran-unix.a, and -lrt, and then run the executable mqueue:

$ gfortran13 -fopenmp -o mqueue mqueue.f90 libfortran-unix.a -lrt
$ ./mqueue
[master] Creating new message queue /fortran ...
[master] MQ flags.............: 0
[master] MQ max. # of messages: 10
[master] MQ max. message size.: 1024
[master] Current # of messages: 0
[sender] Opening message queue /fortran ...
[sender] Sending message Hello, World! ...
[receiver] Opening message queue /fortran ...
[sender] Closing message queue ...
[receiver] Waiting for message ...
[receiver] Received message Hello, World!
[receiver] Closing message queue ...
[master] Closing and removing message queue ...

Message queues that have not been closed properly can be deleted with unlink(1):

$ unlink /mnt/mqueue/<name>

Fortran Libraries

References