ZeroMQ
ZeroMQ (short ØMQ or ZMQ) is a lightweight messaging kernel library written in C++ that extends standard socket interfaces for in-process, inter-process, TCP, and multicast communication. ZeroMQ bindings are available for many programming languages.
The library can be integrated into Fortran applications to implement, for example, asynchronous message queues, or request-reply patterns for remote procedure calls (RPC). Several messaging patterns are available across multiple transport protocols:
- Request-Reply
- Connects a set of clients to a set of services, for remote procedure calls and task distribution.
- Pub-Sub
- A data distribution patterns that connects a set of publishers to a set of subscribers.
- Pipeline
- A fan-out/fan-in pattern that can have multiple steps and loops, for parallel task distribution and collection.
- Exclusive Pair
- Used to connect two threads in a process.
ZeroMQ is licenced under the GNU LGPL v3.
Installation
The ZeroMQ library version 4 runs on most Unix-like operating systems. On FreeBSD, install ZeroMQ with:
# pkg install net/libzmq4
There are two Fortran bindings available: F77_ZMQ (GNU LGPL v2.1), a now out-dated interface for FORTRAN 77, and FZMQ (Mozilla Public License v2.0), for Fortran 2003. Compile the FZMQ bindings using CMake. Pandoc has to be present to generate the documentation:
$ unzip fzmq-master.zip
$ cd fzmq-master/
$ mkdir build && cd build/
$ cmake ..
$ make
Once finished, the directory build/src/
contains the static
library libfzmq.a
and the module header zmq.mod
for
linking. Optionally, run make install
to install the bindings
globally.
Example
The example uses the sections
construct of OpenMP to spawn two
independent threads that exchange data through ZeroMQ’s in-process transport.
The data is shared directly in memory, using the same context.
! example.f90
program main
use, intrinsic :: iso_c_binding
use, intrinsic :: omp_lib
use :: zmq
implicit none
integer, target :: data
integer :: rc
type(c_ptr) :: context
type(c_ptr) :: socket
print '("ZeroMQ Test")'
print '(a)', repeat('-', 32)
! Set OpenMP options.
call omp_set_dynamic(.false.)
call omp_set_num_threads(2)
print '("Creating new context ...")'
context = zmq_ctx_new()
!$omp parallel shared(context), private(socket, rc, data)
!$omp sections
!$omp section
! Sender thread.
print '("[Thread ", i0, "] Thread created")', omp_get_thread_num()
socket = zmq_socket(context, ZMQ_PAIR)
rc = zmq_connect(socket, 'inproc://fzmq')
data = 123
print '("[Thread ", i0, "] Sending value ", i0, " ...")', omp_get_thread_num(), data
call send(socket, data)
rc = zmq_close(socket)
!$omp section
! Receiver thread.
print '("[Thread ", i0, "] Thread created")', omp_get_thread_num()
socket = zmq_socket(context, ZMQ_PAIR)
rc = zmq_bind(socket, 'inproc://fzmq')
call recv(socket, data)
print '("[Thread ", i0, "] Received value ", i0)', omp_get_thread_num(), data
rc = zmq_close(socket)
!$omp end sections
!$omp end parallel
print '("Terminating context ...")'
rc = zmq_ctx_term(context)
contains
subroutine send(socket, data)
type(c_ptr), intent(inout) :: socket
integer(kind=c_int), pointer, intent(in) :: data
integer(kind=c_int) :: nbytes
integer(kind=c_int) :: rc
type(zmq_msg_t) :: message
rc = zmq_msg_init_data(message, c_loc(data), c_sizeof(data), c_null_ptr, c_null_ptr)
nbytes = zmq_msg_send(message, socket, 0)
end subroutine
subroutine recv(socket, data)
type(c_ptr), intent(inout) :: socket
integer, intent(out) :: data
character(kind=c_char), pointer :: buffer
character(kind=c_char), pointer :: range
integer :: nbytes
integer :: rc
type(c_ptr) :: ptr
type(zmq_msg_t) :: message
rc = zmq_msg_init(message)
nbytes = zmq_msg_recv(message, socket, 0)
ptr = zmq_msg_data(message)
call c_f_pointer(ptr, buffer)
range => buffer(1:c_sizeof(data))
data = transfer(range, data)
rc = zmq_msg_close(message)
end subroutine
end program main
Copy the compiled ZeroMQ bindings library libfzmq.a
to
lib/
and the module headers zmq.mod
to
include/
inside the example project’s workspace directory. The
directory should contain the following files:
<project>/
include/
zmq.mod
lib/
libfzmq.a
example.f90
Link the example against the ZeroMQ shared libraries with
-lfzmq -lzmq
:
$ gfortran13 -Wall -fopenmp -I/usr/local/include -I./include -L/usr/local/lib -L./lib \
-o example example.f90 -lfzmq -lzmq
Running the executable outputs:
$ ./example
ZeroMQ Test
--------------------------------
Creating new context ...
[Thread 0] Thread created
[Thread 1] Thread created
[Thread 0] Sending value 123 ...
[Thread 1] Received value 123
Terminating context ...
HTTP Server
- Fig. 1: The served HTML document
ZeroMQ 4.0 introduced the ZMQ_STREAM
socket type that lets us
send and receive TCP streams asynchronously, either as client or server. The
pattern makes it possible to implement a basic HTTP server in Fortran. The
following program simply serves a static HTML document:
! http.f90
!
! Basic HTTP server written in Fortran 2003. Uses the `ZMQ_STREAM` socket of
! ZeroMQ 4. The server listens for incoming HTTP connections on 0.0.0.0:8080.
!
! Example has been ported from: http://api.zeromq.org/4-3:zmq-socket
program main
use, intrinsic :: iso_c_binding
use :: zmq
implicit none
integer(kind=c_size_t), parameter :: BUFFER_SIZE = 4096_c_size_t
character(len=*), parameter :: CONTENT_TYPE = 'text/html'
character(len=*), parameter :: CR_LF = achar(13) // achar(10)
character(len=*), parameter :: NL = achar(10)
character(len=*), parameter :: ADDRESS = '*' ! Or: '127.0.0.1'
character(len=*), parameter :: PORT = '8080'
character(len=*), parameter :: HOST = 'tcp://' // ADDRESS // ':' // PORT
character(len=:), allocatable :: request
character(len=:), allocatable :: response
integer(kind=c_int8_t), allocatable :: id(:)
integer :: rc
type(c_ptr) :: context
type(c_ptr) :: socket
! Create new ZeroMQ context and socket.
context = zmq_ctx_new()
socket = zmq_socket(context, ZMQ_STREAM)
! Bind to IP address and port.
if (zmq_bind(socket, HOST) /= 0) then
stop 'Error: binding to address ' // HOST // ' failed'
end if
print '("Serving ", a, " ...")', HOST
response = 'HTTP/1.0 200 OK' // CR_LF // &
'Content-Type: ' // CONTENT_TYPE // CR_LF // &
CR_LF // &
'<!DOCTYPE html>' // NL // &
'<html lang="en">' // NL // &
'<head>' // NL // &
' <meta charset="utf-8">' // NL // &
' <title>ZeroMQ HTTP Server</title>' // NL // &
'</head>' // NL // &
'<body bgcolor="#66cdaa">' // NL // &
' <h1>Hello, from Fortran!</h1></body>' // NL // &
'</html>'
! Event loop.
do
! Wait for HTTP request.
call http_receive(socket, id, request)
! Send default HTTP response and HTML payload.
call http_send(socket, id, response)
end do
! Close socket and destroy context.
rc = zmq_close(socket)
rc = zmq_ctx_destroy(context)
contains
subroutine http_receive(socket, id, request)
!! Returns ZeroMQ message id and received HTTP request.
type(c_ptr), intent(inout) :: socket
integer(kind=c_int8_t), allocatable, intent(out) :: id(:)
character(len=:), allocatable, intent(out) :: request
integer(kind=c_int8_t), target :: raw(BUFFER_SIZE)
integer(kind=c_size_t) :: i, nbytes
! Receive [id, ] message (with empty payload) on client connection.
! First, get the id frame.
nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
if (nbytes == 0_c_size_t) return
nbytes = min(nbytes, BUFFER_SIZE)
allocate (id(nbytes))
id(:) = raw(1:nbytes)
! Then, get the empty payload frame.
nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
if (nbytes /= 0_c_size_t) return
! Receive [id, payload] message containing the HTTP request.
! First, get the id frame.
nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
if (nbytes == 0_c_size_t) return
! Then, get the payload frame.
nbytes = zmq_recv(socket, c_loc(raw), BUFFER_SIZE, 0)
nbytes = min(nbytes, BUFFER_SIZE)
! Convert `c_int8_t` to Fortran character.
allocate (character(len=nbytes) :: request)
do i = 1, nbytes
request(i:i) = achar(raw(i))
end do
end subroutine http_receive
subroutine http_send(socket, id, response)
!! Sends a ZeroMQ message containing the HTTP response.
type(c_ptr), intent(inout) :: socket
integer(kind=c_int8_t), target, intent(in) :: id(:)
character(len=*), target, intent(in) :: response
integer(kind=c_size_t) :: id_size
integer(kind=c_size_t) :: response_size
integer :: rc
id_size = size(id, kind=c_size_t)
response_size = len(response, kind=c_size_t)
! Send the routing id frame followed by the response.
rc = zmq_send(socket, c_loc(id), id_size, ZMQ_SNDMORE)
rc = zmq_send(socket, c_loc(response), response_size, 0)
! Close the connection by sending the routing id frame followed by
! a zero response.
rc = zmq_send(socket, c_loc(id), id_size, ZMQ_SNDMORE)
rc = zmq_send(socket, c_null_ptr, 0_c_size_t, 0)
end subroutine http_send
end program main
Compile, link, and run the HTTP server with:
$ gfortran13 -I/usr/local/include -I./include -L/usr/local/lib -L./lib -o http http.f90 -lfzmq -lzmq
$ ./http
Serving tcp://*:8080 ...
Point the include and library search paths to the directories containing
libfzmq.a
and zmq.mod
. Open a web browser and access
http://127.0.0.1:8080/
.
Fortran Libraries
References
- zguide: The ZeroMQ Guide
< OpenMPI | [Index] | MQTT > |