UNIX System V Message Queues

Threads and processes can utilise UNIX System V (SysV) message queues to send and receive data in an arbitrary order for in-process and inter-process communication. SysV message queues were later adopted by a POSIX standard.

C Function Description
msgget() Creates a new message queue or connects to an existing one.
msgsnd() Sends a message to the message queue.
msgrcv() Receives messages of all types or only messages of the given type from the message queue. The access is blocking by default, unless flag IPC_NOWAIT is set.
msgctl() Changes permissions or closes the message queue.
Table 1: System V IPC message functions in C

Only a few routine calls are necessary to interact with SysV message queues from Fortran (table 1). The fortran-unix library includes interface bindings to access them. A custom derived type that stores the message data has to be declared individually:

integer, parameter :: MESSAGE_LEN = 512

type, bind(c) :: message_type
    integer(kind=c_long)   :: type
    character(kind=c_char) :: text(MESSAGE_LEN)
end type message_type

The maximum size of a message is limited, usually to 4056 byte.

For debugging, we can output the numeric error code that gives further information regarding the type of error (table 2). The C integer errno in errno.h is defined as a macro and cannot be accessed directly from Fortran. A wrapper function in C and the Fortran function interface c_errno() is provided by fortran-unix for convenience. The interface c_perror() prints the message string of the last error to stderr.

Constant Description
ENOENT No message queue exists for key and msgflg did not specify IPC_CREAT.
ENOMEM The system does not have enough memory for the new data structure.
EACCES The calling process does not have permission to access the queue.
EEXIST A message queue already exists for key.
ENOSPC The system limit for the maximum number of message queues (MSGMNI) is reached.
Table 2: Error codes written to errno by msgget()

fortran-unix

Clone the fortran-unix repository and build the static library libfortran-unix.a:

$ git clone https://github.com/interkosmos/fortran-unix
$ cd fortran-unix/
$ make

Import the Fortran module unix into your application and link against libfortran-unix.a.

Example

The example opens a new SysV message queue with c_msgget(), sends a message of type 1 with c_msgsnd(), and then waits until c_msgrcv() returns the received message. The message queue access has been abstracted with the Fortran functions ipc_send() and ipc_receive(). The example combines sender and receiver in a single process. In general, a message queue would rather be used to interconnect separate threads or processes with each other.

! example.f90
module ipc
    use, intrinsic :: iso_c_binding
    use :: unix
    implicit none
    private
    integer,         parameter, public :: IPC_PERM = int(o'0666')   ! Access permissions (octal).
    integer(kind=8), parameter, public :: IPC_TYPE = 1              ! Message type.

    ! Example message type for SysV message queues.
    integer(kind=c_size_t), parameter, public :: MESSAGE_LEN = 512

    type, bind(c), public :: c_message_type
        integer(kind=c_long)   :: type
        character(kind=c_char) :: text(MESSAGE_LEN)
    end type c_message_type

    public :: ipc_receive
    public :: ipc_send
contains
    function ipc_receive(msqid, type, text, flag)
        !! Waits for message of given type and returns message text. Calling the
        !! function is blocking, unless `flag` is set to `IPC_NOWAIT`.
        integer,          intent(in)  :: msqid
        integer(kind=8),  intent(in)  :: type
        character(len=*), intent(out) :: text
        integer,          intent(in)  :: flag
        integer(kind=8)               :: ipc_receive
        type(c_message_type), target  :: message

        ipc_receive = c_msgrcv(msqid, c_loc(message), c_sizeof(message%text), type, flag)
        call c_f_str_chars(message%text, text)
    end function ipc_receive

    function ipc_send(msqid, type, text, flag)
        !! Converts Fortran string to C char array, and then sends message of
        !! given type by calling `c_msgsnd()`.
        integer,          intent(in) :: msqid
        integer(kind=8),  intent(in) :: type
        character(len=*), intent(in) :: text
        integer,          intent(in) :: flag
        integer                      :: ipc_send
        type(c_message_type), target :: message

        message%type = type
        call f_c_str_chars(text, message%text)
        ipc_send = c_msgsnd(msqid, c_loc(message), c_sizeof(message%text), flag)
    end function ipc_send
end module ipc

program main
    use, intrinsic :: iso_c_binding
    use, intrinsic :: iso_fortran_env, only: stderr => error_unit, stdout => output_unit
    use :: unix
    use :: ipc
    implicit none

    character(len=MESSAGE_LEN) :: buf   ! Message text buffer.
    integer                    :: msqid ! Message queue id.

    ! Create new message queue.
    msqid = c_msgget(IPC_PRIVATE, ior(IPC_CREAT, IPC_PERM))

    if (msqid < 0) then
        call c_perror('msgget()' // c_null_char)
        stop
    end if

    print '(a, i0, /)', 'Message Queue ID: ', msqid

    ! Send message to message queue.
    print '(a)', 'Sending message ...'

    if (ipc_send(msqid = msqid, &
                 type  = IPC_TYPE, &
                 text  = 'Hello, World!', &
                 flag  = IPC_NOWAIT) < 0) then
        call c_perror('msgsnd()' // c_null_char)
    else
        print '(a)', 'Done.'
    end if

    ! Receive message from message queue (blocking I/O). Set `flag` to
    ! `IPC_NOWAIT` for non-blocking I/O.
    print '(a)', 'Receiving message ...'

    if (ipc_receive(msqid = msqid, &
                    type  = IPC_TYPE, &
                    text  = buf, &
                    flag  = 0) < 0) then
        call c_perror('msgrcv()' // c_null_char)
    else
        print '(2a)', 'Received: ', trim(buf)
    end if

    ! Wait for user input.
    print '(/, a)', 'Press Enter to quit.'
    read (*, '(a)')

    ! Remove message queue.
    print '(a)', 'Closing message queue ...'

    if (c_msgctl(msqid, IPC_RMID, c_null_ptr) < 0) then
        call c_perror('msgctl()' // c_null_char)
    end if
end program main

Compile, link, and run the message queue example with:

$ gfortran10 -o example example.f90 libfortran-unix.a
$ ./example
Message Queue ID: 393227

Sending message ...
Done.
Waiting for message ...
Received: Hello, World!

Press Enter to quit.

Another process can open the established message queue for inter-process communication by calling, in this particular case, c_msgget(393227_8, int(o'0666')) from Fortran.

It is crucial to always remove opened message queues, as no more queues can be created once the system limit has been reached (usually 40). The command-line tool ipcs(1) lists all open SysV message queues:

$ ipcs -q
Message Queues:
T           ID          KEY MODE        OWNER    GROUP
q       393227            0 --rw--wa-w- user     user

Zombie message queues have to be manually removed with ipcrm(1):

$ ipcrm -q <msqid>

The following KornShell script ipcrmall.ksh removes all message queues of a given user, in case there are too many open queues left. On Linux, change gawk to awk:

#!/usr/bin/env ksh93

user=$1
ids=`ipcs -q | grep $user | gawk '{ print $2; }'`

for id in $ids; do
    ipcrm -q $id
done

ipcs -q

Run the script with:

$ ksh93 ./ipcrmall.ksh <username>

On FreeBSD, make sure that GNU awk is installed.

References