ZeroMQ

ZeroMQ (short ØMQ or ZMQ) is a lightweight messaging kernel library written in C++ that extends standard socket interfaces for in-process, inter-process, TCP, and multicast communication. ZeroMQ bindings are available for many programming languages.

The library can be integrated into Fortran applications to implement, for example, asynchronous message queues, or request-reply patterns for remote procedure calls (RPC). Several messaging patterns can be accessed across multiple transport protocols:

Request-Reply
Connects a set of clients to a set of services, for remote procedure calls and task distributions.
Pub-Sub
A data distribution patterns that connects a set of publishers to a set of subscribers.
Pipeline
A fan-out/fan-in pattern that can have multiple steps and loops, for parallel task distribution and collection.
Exclusive Pair
Used to connect two threads in a process.

ZeroMQ is licenced under the GNU LGPL, v. 3.

Installation

The ZeroMQ library version 4 runs on most Unix-like operating systems. On FreeBSD, install ZeroMQ with:

# pkg install net/libzmq4

There are two Fortran bindings available: F77_ZMQ (GNU LGPL, v. 2.1), a now out-dated interface for FORTRAN 77, and FZMQ (Mozilla Public License, v. 2.0), for Fortran 2003. Compile the FZMQ bindings using CMake. Pandoc has to be present to generate the documentation:

$ unzip fzmq-master.zip
$ cd fzmq-master/
$ mkdir build && cd build/
$ cmake ..
$ make

After compilation, the directory build/src/ contains the static library libfzmq.a and the module header zmq.mod for linking. Optionally, run make install to install the bindings globally.

Example

The example uses the sections construct of OpenMP to spawn two independent threads that exchange data through ZeroMQ’s in-process transport. The data is shared directly in memory, using the same context.

! example.f90
program main
    use, intrinsic :: iso_c_binding
    use, intrinsic :: omp_lib
    use :: zmq
    implicit none
    integer, target :: value
    integer         :: rc
    type(c_ptr)     :: context
    type(c_ptr)     :: socket

    print '(a)', 'ZeroMQ Test'
    print '(a)', repeat('-', 32)

    ! Set OpenMP options.
    call omp_set_dynamic(.false.)
    call omp_set_num_threads(2)

    print '(a)', 'Creating new context ...'
    context = zmq_ctx_new()

    !$omp parallel shared(context), private(socket, rc, value)
    !$omp sections
    !$omp section

    ! Sender thread.
    print '("[Thread ", i0, "] Thread created")', omp_get_thread_num()
    socket = zmq_socket(context, ZMQ_PAIR)
    rc     = zmq_connect(socket, 'inproc://fzmq')

    value = 123
    print '("[Thread ", i0, "] Sending value ", i0, " ...")', omp_get_thread_num(), value
    call send(socket, value)
    rc = zmq_close(socket)

    !$omp section

    ! Receiver thread.
    print '("[Thread ", i0, "] Thread created")', omp_get_thread_num()
    socket = zmq_socket(context, ZMQ_PAIR)
    rc     = zmq_bind(socket, 'inproc://fzmq')

    call recv(socket, value)
    print '("[Thread ", i0, "] Received value ", i0)', omp_get_thread_num(), value
    rc = zmq_close(socket)

    !$omp end sections
    !$omp end parallel

    print '(a)', 'Terminating context ...'
    rc = zmq_ctx_term(context)
contains
    subroutine send(socket, value)
        type(c_ptr),                  intent(inout) :: socket
        integer(kind=c_int), pointer, intent(in)    :: value
        integer(kind=c_int)                         :: nbytes
        integer(kind=c_int)                         :: rc
        type(zmq_msg_t)                             :: message

        rc     = zmq_msg_init_data(message, c_loc(value), c_sizeof(value), c_null_ptr, c_null_ptr)
        nbytes = zmq_msg_send(message, socket, 0)
    end subroutine

    subroutine recv(socket, value)
        type(c_ptr), intent(inout)             :: socket
        integer,     intent(out)               :: value
        character(kind=c_char, len=:), pointer :: buffer
        character(kind=c_char, len=:), pointer :: range
        integer                                :: nbytes
        integer                                :: rc
        type(c_ptr)                            :: data
        type(zmq_msg_t)                        :: message

        rc     = zmq_msg_init(message)
        nbytes = zmq_msg_recv(message, socket, 0)
        data   = zmq_msg_data(message)

        call c_f_pointer(data, buffer)
        range => buffer(1:c_sizeof(value))
        value = transfer(range, value)

        rc = zmq_msg_close(message)
    end subroutine
end program main

Copy the compiled ZeroMQ bindings library libfzmq.a lib/ and the module headers zmq.mod include/ inside the example project’s workspace directory. The directory should contain the following files:

Link the example with -lfzmq -lzmq:

$ gfortran10 -Wall -fopenmp -I/usr/local/include/ -I./include/ -L/usr/local/lib/ -L./lib/ \
  -o example example.f90 -lfzmq -lzmq

Running the executable outputs:

$ ./example
ZeroMQ Test
--------------------------------
Creating new context ...
[Thread 0] Thread created
[Thread 1] Thread created
[Thread 0] Sending value 123 ...
[Thread 1] Received value 123
Terminating context ...

HTTP Server

ZeroMQ HTTP server output
Fig. 1: The served HTML document

ZeroMQ 4.0 introduced the ZMQ_STREAM socket type that is used to send and receive TCP streams asynchronously, either as client or server. The pattern allows us to implement a basic HTTP server in Fortran:

! http.f90
!
! Basic HTTP server written in Fortran 2003. Uses the `ZMQ_STREAM` socket of
! ZeroMQ 4. The server listens for incoming HTTP connections on 0.0.0.0:8080.
!
! Example is taken from:
!     http://api.zeromq.org/4-3:zmq-socket
program main
    use, intrinsic :: iso_c_binding
    use :: zmq
    implicit none
    integer(kind=c_size_t), parameter :: BUFFER_SIZE  = int(4096, kind=c_size_t)
    character(len=*),       parameter :: CONTENT_TYPE = 'text/html'
    character(len=*),       parameter :: CR_LF        = achar(13) // achar(10)
    character(len=*),       parameter :: ADDRESS      = '*' ! Or: '127.0.0.1'
    character(len=*),       parameter :: PORT         = '8080'
    character(len=*),       parameter :: HOST         = ADDRESS // ':' // PORT

    character(len=:),       allocatable :: request
    character(len=:),       allocatable :: response
    integer(kind=c_int8_t), allocatable :: id(:)
    integer                             :: rc
    type(c_ptr)                         :: context
    type(c_ptr)                         :: socket

    ! Create new ZeroMQ context and socket.
    context = zmq_ctx_new()
    socket  = zmq_socket(context, ZMQ_STREAM)

    ! Bind to IP address and port.
    if (zmq_bind(socket, 'tcp://' // HOST) /= 0) then
        stop 'Binding to address ' // HOST // ' failed'
    end if

    print '(3a)', 'Serving ', HOST, ' ...'

    response = 'HTTP/1.0 200 OK' // CR_LF // &
               'Content-Type: ' // CONTENT_TYPE // CR_LF // &
               CR_LF // &
               '<!DOCTYPE html>' // CR_LF // &
               '<html lang="en">' // CR_LF // &
               '<head>' // CR_LF // &
               '    <meta charset="utf-8">' // CR_LF // &
               '    <title>ZeroMQ HTTP Server</title>' // CR_LF // &
               '</head>' // CR_LF // &
               '<body><h1>Hello, from Fortran!</h1></body>' // CR_LF // &
               '</html>'

    ! Event loop.
    do
        ! Wait for HTTP request.
        call http_receive(socket, id, request)

        ! Send default HTTP response in HTML format.
        call http_send(socket, id, response)
    end do

    ! Close socket and destroy context.
    rc = zmq_close(socket)
    rc = zmq_ctx_destroy(context)
contains
    subroutine http_receive(socket, id, request)
        !! Returns ZeroMQ message id and received HTTP request.
        type(c_ptr),                         intent(inout) :: socket
        integer(kind=c_int8_t), allocatable, intent(out)   :: id(:)
        character(len=:),       allocatable, intent(out)   :: request
        integer(kind=c_int8_t), target                     :: raw(BUFFER_SIZE)
        integer(kind=c_size_t)                             :: nbytes
        integer                                            :: i

        ! Receive [id, ] message (with empty payload) on client connection.
        ! First, get the id frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        if (nbytes == 0_c_size_t) return

        nbytes = min(nbytes, BUFFER_SIZE)
        allocate (id(nbytes))
        id(:) = raw(1:nbytes)

        ! Then, get the empty payload frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        if (nbytes /= 0_c_size_t) return

        ! Receive [id, playload] message containing the HTTP request.
        ! First, get the id frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        if (nbytes == 0_c_size_t) return

        ! Then, get the payload frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        nbytes= min(nbytes, BUFFER_SIZE)
        request  = ''

        ! Convert `c_int8_t` to Fortran character.
        do i = 1, int(nbytes)
            request = request // achar(raw(i))
        end do
    end subroutine http_receive

    subroutine http_send(socket, id, response)
        !! Sends a ZeroMQ message containing the HTTP response.
        type(c_ptr),                    intent(inout) :: socket
        integer(kind=c_int8_t), target, intent(in)    :: id(:)
        character(len=*),       target, intent(in)    :: response
        integer(kind=c_size_t)                        :: id_size
        integer(kind=c_size_t)                        :: response_size
        integer                                       :: rc

        id_size = int(size(id), kind=c_size_t)
        response_size = int(len(response), kind=c_size_t)

        ! Send the routing id frame followed by the response.
        rc = zmq_send(socket, c_loc(id), id_size, ZMQ_SNDMORE)
        rc = zmq_send(socket, c_loc(response), response_size, 0)

        ! Close the connection by sending the routing id frame followed by
        ! a zero response.
        rc = zmq_send(socket, c_loc(id), id_size, ZMQ_SNDMORE)
        rc = zmq_send(socket, c_null_ptr, 0_c_size_t, 0)
    end subroutine http_send
end program main

Compile, link, and run the HTTP server with:

$ gfortran10 -I/usr/local/include/ -I./include/ -L/usr/local/lib/ -L./lib/ -o http http.f90 -lfzmq -lzmq
$ ./http
Serving *:8080 ...

Point the include and library search paths to the directories containing libfzmq.a and zmq.mod. Open a web browser and access http://127.0.0.1:8080/.

References