About

This is a WIP guide

I believe that ZeroMQ is a diamond in the rough that is still a in the early adoption stage despite being a mature project. In my opinon, the lack of traction for the library is due to the community rather than the technology itself.

ZeroMQ suffers from a significant learning curve due to its foreign concepts. It requires the programmer to think differently about messaging. When this is combined with a lacking documentation, it results in very significant time requirement to properly understand how the library works and how it can be used.

I want this guide to reduce this time requirement by explaining the key concepts of ZeroMQ, give real world usage examples as well as a general vision of how it could be use. To do so, we will use libzmq-rs which is a library aimed at making ZeroMQ dead simple to use.

Reading Tips

  • There is a search-bar friendly glossary at the end of the guide.

Basics

This is the minimal set of concepts required to get a decent grasp of libzmq.

Socket

The concept of a socket in ZeroMQ is completely novel. A ZeroMQ socket differs from a traditional TCP socket in the following ways (but not limited to):

  • A socket sends and receives atomic messages; messages are guaranteed to either be transmitted in their entirety, or not transmitted at all.
  • A socket send and receive messages asynchronously.
  • A socket can transmit messages over many supported transports, including TCP.
  • Incoming and outgoing messages can be queued and transmitted asynchronously by a background I/O thread.
  • A socket can be connected to zero or more peers at any time.
  • A socket can be bound to zero or more endpoints at any time. Each bound endpoint can listen to zero or more peers.
  • Peer reconnection and disconnection is handled in the background.
  • Support for many authentication and encryption strategies via Mechanism.

Basics

These are the basic methods required to use a socket.

Connect

The socket connect method is use to connect to a peer socket bound at an endpoint to communicate with a peer. Usually a client socket will connect to a server socket, but it could be the other way around.


#![allow(unused_variables)]
fn main() {
let addr: TcpAddr = "8.8.8.8:420".try_into()?;
client.connect(addr)?;
}

Calling connect on a socket is not guaranteed to connect to the peer right away. Usually, the actual connect call will be delayed until it is needed (e.g. when sending a message).

Connections in ZeroMQ are different from traditional connections is the sense that they automatically handle failure. For instance, if a connection fails because of a network error, it will be automatically reconnected if possible.

Furthermore, to successfully connect to a peer, the handshake corresponding to the mechanism used must succeed. This handshake is also done in the background and might fail for various reasons.

Bind

The socket bind method is used to bind a local endpoint to accept connections from peers. Usually a server socket will bind a known endpoint so that other socket can connect to it.


#![allow(unused_variables)]
fn main() {
let addr: TcpAddr = "127.0.0.1:*".try_into()?;
server.bind(addr)?;
}

Contrairy to connect, bind will attempt to bind to the endpoint straight away. If the bind call succeeds, the socket will start to accept connections attempts to this endpoint.

Send

This a fundamental operation of a socket used to transfert messages to another socket. To be able to send messages, a socket must implement the SendMsg trait.


#![allow(unused_variables)]
fn main() {
client.send(msg)?;
}

When send is called on a socket, it will attempt to queue the message to its outgoing buffer. If the buffer is full, meaning it has reached the high water mark, the operation will block. If the send_timeout is set to Period::Infinite, the operation will block until the buffer can accomodate for the message. Otherwise if the timeout is set to Period::Finite(Duration), it will attempt to queue the message for that duration and if it fails, return [WouldBlock].

There is also the try_send method which will return with [WouldBlock] immediately if it cannot queue the message.

Queued messages are send by a background I/O thread to the peer socket. For the messages to be actually sent two conditions must be met:

  • The connection with the peer socket is up.
  • The peer socket can receive messages (its incoming buffer is not full).

Conceptually, a full outgoing buffer can mean many things:

  • The connection has crashed temporarily (network error etc.)
  • The peer socket has crashed and is restarting.
  • The peer socket receives message slower than we can send them (thus this is a back throttling mechanism)
  • Etc.

Many of these scenarios are conceptually indistinguishable. Therefore the user has to decide what to do depending on the context.

Recv

You guessed it, recv is a socket operation used to receive messages from another socket. To be able to receive messages, a socket must implement the RecvMsg trait.


#![allow(unused_variables)]
fn main() {
let msg = client.recv_msg()?;
}

Calling recv on a socket will attempt to extract a message from its incoming buffer. If the incoming buffer is empty, the operation will block until a mesage is received in the buffer. If the recv_timeout is specified, it will try to extract a message from the buffer for the given duration and return [WouldBlock] if it failed.

There is also the try_recv method which, similarly to try_send, will return with [WouldBlock] immediately if it cannot queue the message.

The incoming buffer receives message from the background I/O thread from the peer socket. For the messages to be actually received two conditions must be met:

  • The connection with the peer socket is up.
  • The incoming buffer is not full.

Conceptually, an empty incoming buffer can mean many things:

  • The socket can receive messages faster than what the peer can send.
  • The peer has no messages to send.
  • The connection has a network error.
  • The peer has crashed
  • Etc.

Like before, many of these scenarios are conceptually indistinguishable. We have to decide what to do depending on the context.

Patterns

These are the most basic socket patterns in libzmq.

Client-Server

The Client-Server pattern is a advanced asynchronous request-reply pattern.

The Server receives messages with a unique RoutingId associated with a Client. This RoutingId can be used by the Server to route replies to the Client.

       <───> client
server <───> client
       <───> client

The Client socket receives upstream messages in a fair-queued fashion

server ─┐
server ────> client
server ─┘

Radio-Dish

The Radio-Dish pattern is an asynchronous publish-subscribe pattern.

The Radio socket send messages in a fan-out fashion to all Dish that joined the message's Group.

      ────> dish
radio ────> dish
      ────> dish

The Dish socket receive messages from Group it has joined in a fair-queued fashion.

radio ─┐
radio ────> dish
radio ─┘

Scatter-Gather

The Scatter-Gather pattern is an asynchronous pipeline pattern.

The Scatter socket send messages downstream in a round-robin fashion

         ┌──> gather
scatter ────> gather
         └──> gather

The Gather socket receives upstream messages in a fair-queued fashion

scatter ─┐
scatter ───> gather
scatter ─┘

Advanced

Now that the basic stuff is taken care off, lets dig deeper.

Custom Protocols

For two peers to be able to communicate, they must share a contract. In the world of communication, these are called protocols. ZeroMQ enables programmer to create protocols that suit their needs by removing most of the boilerplate.

You might have realized by now that there is no strict concept of request-reply as a socket operation. Indeed the library does not enforce a client socket to follow a send call by a recv call. This does't mean however that this strict type of request-reply could not be achieved. To do so, a programmer could easily write the following code:


#![allow(unused_variables)]
fn main() {
// Client side
fn request_reply(&mut self, msg: Msg) -> Result<Msg, Error> {
    self.client.send(msg)?;
    self.client.recv_msg()?
}

// Server side
fn run(&mut self) -> Result<(), Error> {
    loop {
        let request = self.server.recv_msg()?;
        let reply = self.on_request(request)?;
        self.server.send(reply)
    }
}
}

This creates an implicit contract between the client and the server. We will disregard the error handling and timeouts for simplicity.

  • The client must send one request at a time and wait for one reply.
  • The server must wait for a request and send one reply.

Since contract must be ensured at the application level, it must be properly documentated for developpers to be able to respect it.

ZeroMQ does not enforce a particular messaging protocol, instead it offers all the tools to build one.

Examples

Here are a few examples usage of varying complexity.

Basic Request Reply

This is as simple as it gets. We have a Server that does some request-reply work in a dedicated thread. We have a Client that sends a "ping" and gets a "pong" back. There is no attempt at security and no attempt at error handling. For a INPROC server, that might be enough.

use libzmq::{prelude::*, *};

use std::thread;

fn main() -> Result<(), anyhow::Error> {
    let addr: InprocAddr = InprocAddr::new_unique();

    let server = ServerBuilder::new().bind(&addr).build()?;

    // Spawn the server thread.
    let handle = thread::spawn(move || -> Result<(), Error> {
        loop {
            let request = server.recv_msg()?;
            assert_eq!(request.to_str(), Ok("ping"));

            // Retrieve the routing_id to route the reply to the client.
            let id = request.routing_id().unwrap();
            // We cast the Error<Msg> to Error<()>. This drops the Msg.
            server.route("pong", id).map_err(Error::cast)?;
        }
    });

    let client = ClientBuilder::new().connect(addr).build()?;

    // Do some request-reply work.
    client.send("ping")?;
    let msg = client.recv_msg()?;
    assert_eq!(msg.to_str(), Ok("pong"));

    // This will cause the server to fail with `InvalidCtx`.
    Ctx::global().shutdown();

    // Join with the thread.
    let err = handle.join().unwrap().unwrap_err();
    assert_eq!(err.kind(), ErrorKind::InvalidCtx);

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::main;

    #[test]
    fn main_runs() {
        main().unwrap();
    }
}

Reliable Request Reply

This is a basic example when using the TCP transport adapting the code from the previous Basic Request Reply example.

Note that this example does not make any attempt at security.

Since TCP is connection oriented transport, we have to take in account that the connection might fail at any time. We use heartbeating to detect failure but also send and recv timeouts to prevent blocking forever.

In this example, the server is protected against failures since it will drop messages if it is unable to route them before send_timeout expires (WouldBlock), or it detects that the peer disconnected via the heartbeats (HostUnreachable).

The client in this case will simply fail if it unable to send a request before the send_timeout or unable to receive a reply before the recv_timeout (WouldBlock). The client might choose to retry later or connect to another server etc.

use libzmq::{prelude::*, *};

use std::{thread, time::Duration};

fn main() -> Result<(), anyhow::Error> {
    // We use a system assigned port here.
    let addr: TcpAddr = "127.0.0.1:*".try_into()?;
    let duration = Duration::from_millis(300);

    let hb = Heartbeat::new(duration)
        .add_timeout(3 * duration)
        .add_ttl(3 * duration);

    let server = ServerBuilder::new()
        .bind(addr)
        .send_timeout(duration)
        .heartbeat(&hb)
        .build()?;

    // Retrieve the assigned port.
    let bound = server.last_endpoint()?;

    // Spawn the server thread. In a real application, this
    // would be on another node.
    let handle = thread::spawn(move || -> Result<(), Error> {
        use ErrorKind::*;
        loop {
            let request = server.recv_msg()?;
            assert_eq!(request.to_str(), Ok("ping"));

            // Retrieve the routing_id to route the reply to the client.
            let id = request.routing_id().unwrap();
            if let Err(err) = server.route("pong", id) {
                match err.kind() {
                    // Cannot route msg, drop it.
                    WouldBlock | HostUnreachable => (),
                    _ => return Err(err.cast()),
                }
            }
        }
    });

    let client = ClientBuilder::new()
        .connect(bound)
        .recv_timeout(duration)
        .send_timeout(duration)
        .heartbeat(hb)
        .build()?;

    // Do some request-reply work.
    client.send("ping")?;
    let msg = client.recv_msg()?;
    assert_eq!(msg.to_str(), Ok("pong"));

    // This will cause the server to fail with `InvalidCtx`.
    Ctx::global().shutdown();

    // Join with the thread.
    let err = handle.join().unwrap().unwrap_err();
    assert_eq!(err.kind(), ErrorKind::InvalidCtx);

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::main;

    #[test]
    fn main_runs() {
        main().unwrap();
    }
}

Secure Request Reply

The previous example did not offer neither authentication nor encryption. For a public TCP connection, its a must. Let's fix that by adapting the previous example.

This time we will the CURVE mechanism, which is a public-key crypto. To enable the usage of the CURVE mechanism, the feature flag 'curve' must be enabled.

However, this time we will use an external configuration file to get rid of all the boilerplate. This will also allows our application to run indepently of the socket configuration.

Based on some basic benchmarks, the CURVE mechanism might reduce the throughtput of I/O heavy applications by half due to the overhead of the salsa20 encryption.

Config File

In this case we used yaml configuration file, but any file format supported by Serde will work (as long as it supports typed enums).

# The curve keys where generated by running:
# `$ cargo run --example gen_curve_cert`

auth:
  # The public keys allowed to authenticate. Note that this is
  # the client's public key.
  curve_registry:
    - "n%3)5@(3pzp)v8yt6RW3eQVq5OQYb#TEodD^6oA^"

client:
  # In a real life scenario the server would have a known addr.
  #connect:
  #  - tcp: "127.0.0.1:3000"
  heartbeat:
      interval: 1s
      timeout: 3s
      ttl: 3s
  send_high_water_mark: 10
  send_timeout: 300ms
  recv_high_water_mark: 100
  recv_timeout: 300ms
  mechanism:
    curve_client:
      client:
        public: "n%3)5@(3pzp)v8yt6RW3eQVq5OQYb#TEodD^6oA^"
        secret: "JiUDa>>owH1+mPTWs=>Jcyt%h.C1E4Js>)(g{geY"
      # This is the server's public key.
      server: "et189NB9uJC7?J+XU8JRhCbF?gOP9+o%kli=y2b8"

server:
  # Here we use a system defined port so as to not conflict with the host
  # machine. In a real life scenario we would have a port available.
  bind:
    - tcp: "127.0.0.1:*"
  heartbeat:
      interval: 1s
      timeout: 3s
      ttl: 3s
  mechanism:
    curve_server:
      secret: "iaoRiIVA^VgV:f4a<@{8K{cP62cE:dh=4:oY+^l("

The code

Aside from the additionnal logic for reading the config file, the code is now simpler than before.

use libzmq::{config::*, prelude::*, *};

use serde::{Deserialize, Serialize};

use std::{
    fs::File,
    io::Read,
    path::{Path, PathBuf},
    thread,
};

const CONFIG_FILE: &str = "secure_req_rep.yml";

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
    auth: AuthConfig,
    client: ClientConfig,
    server: ServerConfig,
}

fn read_file(name: &Path) -> std::io::Result<Vec<u8>> {
    let mut file = File::open(name)?;
    let mut buf = Vec::new();
    file.read_to_end(&mut buf)?;
    Ok(buf)
}

fn main() -> Result<(), anyhow::Error> {
    let path = PathBuf::from("examples").join(CONFIG_FILE);

    let config: Config =
        serde_yaml::from_slice(&read_file(&path).unwrap()).unwrap();

    // Configure the `AuthServer`. We won't need the returned `AuthClient`.
    let _ = config.auth.build()?;

    // Configure our two sockets.
    let server = config.server.build()?;
    let client = config.client.build()?;

    // Once again we used a system assigned port for our server.
    let bound = server.last_endpoint()?;
    client.connect(bound)?;

    // Spawn the server thread. In a real application, this
    // would be on another node.
    let handle = thread::spawn(move || -> Result<(), Error> {
        use ErrorKind::*;
        loop {
            let request = server.recv_msg()?;
            assert_eq!(request.to_str(), Ok("ping"));

            // Retrieve the routing_id to route the reply to the client.
            let id = request.routing_id().unwrap();
            if let Err(err) = server.route("pong", id) {
                match err.kind() {
                    // Cannot route msg, drop it.
                    WouldBlock | HostUnreachable => (),
                    _ => return Err(err.cast()),
                }
            }
        }
    });

    // Do some request-reply work.
    client.send("ping")?;
    let msg = client.recv_msg()?;
    assert_eq!(msg.to_str(), Ok("pong"));

    // This will cause the server to fail with `InvalidCtx`.
    Ctx::global().shutdown();

    // Join with the thread.
    let err = handle.join().unwrap().unwrap_err();
    assert_eq!(err.kind(), ErrorKind::InvalidCtx);

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::main;

    #[test]
    fn main_runs() {
        main().unwrap();
    }
}

Glossary

Some high level definitions.

Endpoint

A endpoint is a rendez-vous address for a specified transport. The syntax of the address depends on the nature of the transport. For instance a TcpAddr is an endpoint over the TCP transport.

Transport

A protocol to transfert data. Could be a network protocol, such as TCP, could be a inter-thread protocol, such as INPROC, etc.

Connection

Connections in ZeroMQ are different from traditional connections is the sense that they automatically handle failure. For instance, if a connection fails because of a network error, it will be automatically reconnected if possible. Thus sockets should not worry about the state of a given connection.

Message

An atomic arbitrary set of bytes owned by the ZeroMQ engine. ZeroMQ does not know how to interpret these bytes, only the user does. Messages are the units that are transferred between sockets.

Socket

A ZeroMQ construct used to send and receive messages using connections accros endpoints. The specific behavior of the socket depends on its type.

High Water Mark

The message limit in the incoming or outgoing buffer. If the incoming buffer has reached this limit, the socket will stop receiving messages in the background. If the outgoing buffer has reached this limit, attempting to queue a message will block the calling thread. Conceptually, this is a socket's back throttling mechanism.

Context

A ZeroMQ context is a session that keeps track of all the sockets, the messages, the async threads and the internal queries.

Mechanism

A specific protocol used by sockets to authenticate and encrypt traffic.

Mute State

A socket that is in mute state is unable to queue and receive messages. This is likely because it has no peers. The condition for the mute state to occur depends on the socket type.