ZeroMQ

ZeroMQ (short ØMQ or ZMQ) is a lightweight messaging kernel library written in C++ that extends standard socket interfaces for in-process, inter-process, TCP, and multicast communication. ZeroMQ bindings are available for many programming languages.

The library can be integrated into Fortran applications to implement, for example, asynchronous message queues, or request-reply patterns for remote procedure calls (RPC). Several messaging patterns are available across multiple transport protocols:

Request-Reply
Connects a set of clients to a set of services, for remote procedure calls and task distribution.
Pub-Sub
A data distribution patterns that connects a set of publishers to a set of subscribers.
Pipeline
A fan-out/fan-in pattern that can have multiple steps and loops, for parallel task distribution and collection.
Exclusive Pair
Used to connect two threads in a process.

ZeroMQ is licenced under the GNU LGPL v3.

Installation

The ZeroMQ library version 4 runs on most Unix-like operating systems. On FreeBSD, install ZeroMQ with:

# pkg install net/libzmq4

There are two Fortran bindings available: F77_ZMQ (GNU LGPL v2.1), a now out-dated interface for FORTRAN 77, and FZMQ (Mozilla Public License v2.0), for Fortran 2003. Compile the FZMQ bindings using CMake. Pandoc has to be present to generate the documentation:

$ unzip fzmq-master.zip
$ cd fzmq-master/
$ mkdir build && cd build/
$ cmake ..
$ make

Once finished, the directory build/src/ contains the static library libfzmq.a and the module header zmq.mod for linking. Optionally, run make install to install the bindings globally.

Example

The example uses the sections construct of OpenMP to spawn two independent threads that exchange data through ZeroMQ’s in-process transport. The data is shared directly in memory, using the same context.

! example.f90
program main
    use, intrinsic :: iso_c_binding
    use, intrinsic :: omp_lib
    use :: zmq
    implicit none

    integer, target :: data
    integer         :: rc
    type(c_ptr)     :: context
    type(c_ptr)     :: socket

    print '("ZeroMQ Test")'
    print '(a)', repeat('-', 32)

    ! Set OpenMP options.
    call omp_set_dynamic(.false.)
    call omp_set_num_threads(2)

    print '("Creating new context ...")'
    context = zmq_ctx_new()

    !$omp parallel shared(context), private(socket, rc, data)
    !$omp sections
    !$omp section

        ! Sender thread.
        print '("[Thread ", i0, "] Thread created")', omp_get_thread_num()
        socket = zmq_socket(context, ZMQ_PAIR)
        rc     = zmq_connect(socket, 'inproc://fzmq')

        data = 123
        print '("[Thread ", i0, "] Sending value ", i0, " ...")', omp_get_thread_num(), data
        call send(socket, data)
        rc = zmq_close(socket)

    !$omp section

        ! Receiver thread.
        print '("[Thread ", i0, "] Thread created")', omp_get_thread_num()
        socket = zmq_socket(context, ZMQ_PAIR)
        rc     = zmq_bind(socket, 'inproc://fzmq')

        call recv(socket, data)
        print '("[Thread ", i0, "] Received value ", i0)', omp_get_thread_num(), data
        rc = zmq_close(socket)

    !$omp end sections
    !$omp end parallel

    print '("Terminating context ...")'
    rc = zmq_ctx_term(context)
contains
    subroutine send(socket, data)
        type(c_ptr),                  intent(inout) :: socket
        integer(kind=c_int), pointer, intent(in)    :: data

        integer(kind=c_int) :: nbytes
        integer(kind=c_int) :: rc
        type(zmq_msg_t)     :: message

        rc     = zmq_msg_init_data(message, c_loc(data), c_sizeof(data), c_null_ptr, c_null_ptr)
        nbytes = zmq_msg_send(message, socket, 0)
    end subroutine

    subroutine recv(socket, data)
        type(c_ptr), intent(inout) :: socket
        integer,     intent(out)   :: data

        character(kind=c_char), pointer :: buffer
        character(kind=c_char), pointer :: range

        integer         :: nbytes
        integer         :: rc
        type(c_ptr)     :: ptr
        type(zmq_msg_t) :: message

        rc     = zmq_msg_init(message)
        nbytes = zmq_msg_recv(message, socket, 0)
        ptr    = zmq_msg_data(message)

        call c_f_pointer(ptr, buffer)
        range => buffer(1:c_sizeof(data))
        data = transfer(range, data)

        rc = zmq_msg_close(message)
    end subroutine
end program main

Copy the compiled ZeroMQ bindings library libfzmq.a to lib/ and the module headers zmq.mod to include/ inside the example project’s workspace directory. The directory should contain the following files:

Link the example against the ZeroMQ shared libraries with -lfzmq -lzmq:

$ gfortran13 -Wall -fopenmp -I/usr/local/include -I./include -L/usr/local/lib -L./lib \
  -o example example.f90 -lfzmq -lzmq

Running the executable outputs:

$ ./example
ZeroMQ Test
--------------------------------
Creating new context ...
[Thread 0] Thread created
[Thread 1] Thread created
[Thread 0] Sending value 123 ...
[Thread 1] Received value 123
Terminating context ...

HTTP Server

ZeroMQ HTTP server output
Fig. 1: The served HTML document

ZeroMQ 4.0 introduced the ZMQ_STREAM socket type that lets us send and receive TCP streams asynchronously, either as client or server. The pattern makes it possible to implement a basic HTTP server in Fortran. The following program simply serves a static HTML document:

! http.f90
!
! Basic HTTP server written in Fortran 2003. Uses the `ZMQ_STREAM` socket of
! ZeroMQ 4. The server listens for incoming HTTP connections on 0.0.0.0:8080.
!
! Example has been ported from: http://api.zeromq.org/4-3:zmq-socket
program main
    use, intrinsic :: iso_c_binding
    use :: zmq
    implicit none
    integer(kind=c_size_t), parameter :: BUFFER_SIZE  = 4096_c_size_t
    character(len=*),       parameter :: CONTENT_TYPE = 'text/html'
    character(len=*),       parameter :: CR_LF        = achar(13) // achar(10)
    character(len=*),       parameter :: NL           = achar(10)
    character(len=*),       parameter :: ADDRESS      = '*' ! Or: '127.0.0.1'
    character(len=*),       parameter :: PORT         = '8080'
    character(len=*),       parameter :: HOST         = 'tcp://' // ADDRESS // ':' // PORT

    character(len=:),       allocatable :: request
    character(len=:),       allocatable :: response
    integer(kind=c_int8_t), allocatable :: id(:)

    integer     :: rc
    type(c_ptr) :: context
    type(c_ptr) :: socket

    ! Create new ZeroMQ context and socket.
    context = zmq_ctx_new()
    socket  = zmq_socket(context, ZMQ_STREAM)

    ! Bind to IP address and port.
    if (zmq_bind(socket, HOST) /= 0) then
        stop 'Error: binding to address ' // HOST // ' failed'
    end if

    print '("Serving ", a, " ...")', HOST

    response = 'HTTP/1.0 200 OK' // CR_LF // &
               'Content-Type: ' // CONTENT_TYPE // CR_LF // &
               CR_LF // &
               '<!DOCTYPE html>' // NL // &
               '<html lang="en">' // NL // &
               '<head>' // NL // &
               '    <meta charset="utf-8">' // NL // &
               '    <title>ZeroMQ HTTP Server</title>' // NL // &
               '</head>' // NL // &
               '<body bgcolor="#66cdaa">' // NL // &
               '    <h1>Hello, from Fortran!</h1></body>' // NL // &
               '</html>'

    ! Event loop.
    do
        ! Wait for HTTP request.
        call http_receive(socket, id, request)

        ! Send default HTTP response and HTML payload.
        call http_send(socket, id, response)
    end do

    ! Close socket and destroy context.
    rc = zmq_close(socket)
    rc = zmq_ctx_destroy(context)
contains
    subroutine http_receive(socket, id, request)
        !! Returns ZeroMQ message id and received HTTP request.
        type(c_ptr),                         intent(inout) :: socket
        integer(kind=c_int8_t), allocatable, intent(out)   :: id(:)
        character(len=:),       allocatable, intent(out)   :: request

        integer(kind=c_int8_t), target :: raw(BUFFER_SIZE)
        integer(kind=c_size_t)         :: i, nbytes

        ! Receive [id, ] message (with empty payload) on client connection.
        ! First, get the id frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        if (nbytes == 0_c_size_t) return

        nbytes = min(nbytes, BUFFER_SIZE)
        allocate (id(nbytes))
        id(:) = raw(1:nbytes)

        ! Then, get the empty payload frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        if (nbytes /= 0_c_size_t) return

        ! Receive [id, payload] message containing the HTTP request.
        ! First, get the id frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        if (nbytes == 0_c_size_t) return

        ! Then, get the payload frame.
        nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
        nbytes = min(nbytes, BUFFER_SIZE)

        ! Convert `c_int8_t` to Fortran character.
        allocate (character(len=nbytes) :: request)

        do i = 1, nbytes
            request(i:i) = achar(raw(i))
        end do
    end subroutine http_receive

    subroutine http_send(socket, id, response)
        !! Sends a ZeroMQ message containing the HTTP response.
        type(c_ptr),                    intent(inout) :: socket
        integer(kind=c_int8_t), target, intent(in)    :: id(:)
        character(len=*),       target, intent(in)    :: response

        integer(kind=c_size_t) :: id_size
        integer(kind=c_size_t) :: response_size
        integer                :: rc

        id_size = size(id, kind=c_size_t)
        response_size = len(response, kind=c_size_t)

        ! Send the routing id frame followed by the response.
        rc = zmq_send(socket, c_loc(id), id_size, ZMQ_SNDMORE)
        rc = zmq_send(socket, c_loc(response), response_size, 0)

        ! Close the connection by sending the routing id frame followed by
        ! a zero response.
        rc = zmq_send(socket, c_loc(id), id_size, ZMQ_SNDMORE)
        rc = zmq_send(socket, c_null_ptr, 0_c_size_t, 0)
    end subroutine http_send
end program main

Compile, link, and run the HTTP server with:

$ gfortran13 -I/usr/local/include -I./include -L/usr/local/lib -L./lib -o http http.f90 -lfzmq -lzmq
$ ./http
Serving tcp://*:8080 ...

Point the include and library search paths to the directories containing libfzmq.a and zmq.mod. Open a web browser and access http://127.0.0.1:8080/.

Fortran Libraries

References