About
This is a WIP guide
I believe that ZeroMQ
is a diamond in the rough that is still a in the early
adoption stage despite being a mature project. In my opinon, the lack of traction
for the library is due to the community rather than the technology itself.
ZeroMQ
suffers from a significant learning curve due to its foreign concepts.
It requires the programmer to think differently about messaging. When this is
combined with a lacking documentation, it results in very significant time
requirement to properly understand how the library works and how it can be used.
I want this guide to reduce this time requirement by explaining the key concepts of
ZeroMQ
, give real world usage examples as well as a general vision of how it
could be use. To do so, we will use libzmq-rs which is a library aimed at
making ZeroMQ
dead simple to use.
Reading Tips
- There is a search-bar friendly glossary at the end of the guide.
Basics
This is the minimal set of concepts required to get a decent grasp of libzmq
.
Socket
The concept of a socket
in ZeroMQ
is completely novel. A ZeroMQ
socket
differs from a traditional TCP
socket in the following ways (but not limited to):
- A socket sends and receives atomic messages; messages are guaranteed to either be transmitted in their entirety, or not transmitted at all.
- A socket send and receive messages asynchronously.
- A socket can transmit messages over many supported transports, including
TCP
. - Incoming and outgoing messages can be queued and transmitted asynchronously by a background I/O thread.
- A socket can be connected to zero or more peers at any time.
- A socket can be bound to zero or more endpoints at any time. Each bound endpoint can listen to zero or more peers.
- Peer reconnection and disconnection is handled in the background.
- Support for many authentication and encryption strategies via Mechanism.
Basics
These are the basic methods required to use a socket.
Connect
The socket connect method is use to connect to a peer socket bound at an endpoint to communicate with a peer. Usually a client socket will connect to a server socket, but it could be the other way around.
#![allow(unused_variables)] fn main() { let addr: TcpAddr = "8.8.8.8:420".try_into()?; client.connect(addr)?; }
Calling connect on a socket is not guaranteed to connect to the peer right away. Usually, the actual connect call will be delayed until it is needed (e.g. when sending a message).
Connections in ZeroMQ
are different from traditional connections is the
sense that they automatically handle failure. For instance, if a connection
fails because of a network error, it will be automatically reconnected if
possible.
Furthermore, to successfully connect to a peer, the handshake corresponding to the mechanism used must succeed. This handshake is also done in the background and might fail for various reasons.
Bind
The socket bind method is used to bind a local endpoint to accept connections from peers. Usually a server socket will bind a known endpoint so that other socket can connect to it.
#![allow(unused_variables)] fn main() { let addr: TcpAddr = "127.0.0.1:*".try_into()?; server.bind(addr)?; }
Contrairy to connect, bind will attempt to bind to the endpoint straight away. If the bind call succeeds, the socket will start to accept connections attempts to this endpoint.
Send
This a fundamental operation of a socket used to transfert messages to another socket. To be able to send messages, a socket must implement the SendMsg trait.
#![allow(unused_variables)] fn main() { client.send(msg)?; }
When send is called on a socket, it will attempt to queue the message
to its outgoing buffer. If the buffer is full, meaning it has reached the
high water mark, the operation will block. If the send_timeout is set
to Period::Infinite
, the operation will block until the buffer can accomodate for
the message. Otherwise if the timeout is set to Period::Finite(Duration)
, it
will attempt to queue the message for that duration and if it fails,
return [WouldBlock].
There is also the try_send method which will return with [WouldBlock] immediately if it cannot queue the message.
Queued messages are send by a background I/O thread to the peer socket. For the messages to be actually sent two conditions must be met:
- The connection with the peer socket is up.
- The peer socket can receive messages (its incoming buffer is not full).
Conceptually, a full outgoing buffer can mean many things:
- The connection has crashed temporarily (network error etc.)
- The peer socket has crashed and is restarting.
- The peer socket receives message slower than we can send them (thus this is a back throttling mechanism)
- Etc.
Many of these scenarios are conceptually indistinguishable. Therefore the user has to decide what to do depending on the context.
Recv
You guessed it, recv is a socket operation used to receive messages from another socket. To be able to receive messages, a socket must implement the RecvMsg trait.
#![allow(unused_variables)] fn main() { let msg = client.recv_msg()?; }
Calling recv on a socket will attempt to extract a message from its incoming buffer. If the incoming buffer is empty, the operation will block until a mesage is received in the buffer. If the recv_timeout is specified, it will try to extract a message from the buffer for the given duration and return [WouldBlock] if it failed.
There is also the try_recv method which, similarly to try_send, will return with [WouldBlock] immediately if it cannot queue the message.
The incoming buffer receives message from the background I/O thread from the peer socket. For the messages to be actually received two conditions must be met:
- The connection with the peer socket is up.
- The incoming buffer is not full.
Conceptually, an empty incoming buffer can mean many things:
- The socket can receive messages faster than what the peer can send.
- The peer has no messages to send.
- The connection has a network error.
- The peer has crashed
- Etc.
Like before, many of these scenarios are conceptually indistinguishable. We have to decide what to do depending on the context.
Patterns
These are the most basic socket patterns in libzmq
.
Client-Server
The Client-Server
pattern is a advanced asynchronous request-reply pattern.
The Server receives messages with a unique RoutingId associated with a Client. This RoutingId can be used by the Server to route replies to the Client.
<───> client
server <───> client
<───> client
The Client socket receives upstream messages in a fair-queued fashion
server ─┐
server ────> client
server ─┘
Radio-Dish
The Radio-Dish
pattern is an asynchronous publish-subscribe pattern.
The Radio socket send messages in a fan-out fashion to all Dish that joined the message's Group.
────> dish
radio ────> dish
────> dish
The Dish socket receive messages from Group it has joined in a fair-queued fashion.
radio ─┐
radio ────> dish
radio ─┘
Scatter-Gather
The Scatter-Gather
pattern is an asynchronous pipeline pattern.
The Scatter socket send messages downstream in a round-robin fashion
┌──> gather
scatter ────> gather
└──> gather
The Gather socket receives upstream messages in a fair-queued fashion
scatter ─┐
scatter ───> gather
scatter ─┘
Advanced
Now that the basic stuff is taken care off, lets dig deeper.
Custom Protocols
For two peers to be able to communicate, they must share a contract. In the
world of communication, these are called protocols. ZeroMQ
enables
programmer to create protocols that suit their needs by removing most of the
boilerplate.
You might have realized by now that there is no strict concept of request-reply
as a socket operation. Indeed the library does not enforce a client socket
to follow a send
call by a recv
call. This does't mean however that this
strict type of request-reply could not be achieved. To do so, a programmer could
easily write the following code:
#![allow(unused_variables)] fn main() { // Client side fn request_reply(&mut self, msg: Msg) -> Result<Msg, Error> { self.client.send(msg)?; self.client.recv_msg()? } // Server side fn run(&mut self) -> Result<(), Error> { loop { let request = self.server.recv_msg()?; let reply = self.on_request(request)?; self.server.send(reply) } } }
This creates an implicit contract between the client and the server. We will disregard the error handling and timeouts for simplicity.
- The client must send one request at a time and wait for one reply.
- The server must wait for a request and send one reply.
Since contract must be ensured at the application level, it must be properly documentated for developpers to be able to respect it.
ZeroMQ
does not enforce a particular messaging protocol, instead
it offers all the tools to build one.
Examples
Here are a few examples usage of varying complexity.
Basic Request Reply
This is as simple as it gets. We have a Server that does some request-reply
work in a dedicated thread. We have a Client that sends a "ping" and gets
a "pong" back. There is no attempt at security and no attempt at error handling.
For a INPROC
server, that might be enough.
use libzmq::{prelude::*, *}; use std::thread; fn main() -> Result<(), anyhow::Error> { let addr: InprocAddr = InprocAddr::new_unique(); let server = ServerBuilder::new().bind(&addr).build()?; // Spawn the server thread. let handle = thread::spawn(move || -> Result<(), Error> { loop { let request = server.recv_msg()?; assert_eq!(request.to_str(), Ok("ping")); // Retrieve the routing_id to route the reply to the client. let id = request.routing_id().unwrap(); // We cast the Error<Msg> to Error<()>. This drops the Msg. server.route("pong", id).map_err(Error::cast)?; } }); let client = ClientBuilder::new().connect(addr).build()?; // Do some request-reply work. client.send("ping")?; let msg = client.recv_msg()?; assert_eq!(msg.to_str(), Ok("pong")); // This will cause the server to fail with `InvalidCtx`. Ctx::global().shutdown(); // Join with the thread. let err = handle.join().unwrap().unwrap_err(); assert_eq!(err.kind(), ErrorKind::InvalidCtx); Ok(()) } #[cfg(test)] mod tests { use super::main; #[test] fn main_runs() { main().unwrap(); } }
Reliable Request Reply
This is a basic example when using the TCP
transport adapting the code
from the previous Basic Request Reply
example.
Note that this example does not make any attempt at security.
Since TCP
is connection oriented transport, we have to take in account that
the connection might fail at any time. We use heartbeating to detect failure
but also send
and recv
timeouts to prevent blocking forever.
In this example, the server is protected against failures since it will drop
messages if it is unable to route them before send_timeout
expires (WouldBlock
),
or it detects that the peer disconnected via the heartbeats (HostUnreachable
).
The client in this case will simply fail if it unable to send a request before the
send_timeout
or unable to receive a reply before the recv_timeout
(WouldBlock
).
The client might choose to retry later or connect to another server etc.
use libzmq::{prelude::*, *}; use std::{thread, time::Duration}; fn main() -> Result<(), anyhow::Error> { // We use a system assigned port here. let addr: TcpAddr = "127.0.0.1:*".try_into()?; let duration = Duration::from_millis(300); let hb = Heartbeat::new(duration) .add_timeout(3 * duration) .add_ttl(3 * duration); let server = ServerBuilder::new() .bind(addr) .send_timeout(duration) .heartbeat(&hb) .build()?; // Retrieve the assigned port. let bound = server.last_endpoint()?; // Spawn the server thread. In a real application, this // would be on another node. let handle = thread::spawn(move || -> Result<(), Error> { use ErrorKind::*; loop { let request = server.recv_msg()?; assert_eq!(request.to_str(), Ok("ping")); // Retrieve the routing_id to route the reply to the client. let id = request.routing_id().unwrap(); if let Err(err) = server.route("pong", id) { match err.kind() { // Cannot route msg, drop it. WouldBlock | HostUnreachable => (), _ => return Err(err.cast()), } } } }); let client = ClientBuilder::new() .connect(bound) .recv_timeout(duration) .send_timeout(duration) .heartbeat(hb) .build()?; // Do some request-reply work. client.send("ping")?; let msg = client.recv_msg()?; assert_eq!(msg.to_str(), Ok("pong")); // This will cause the server to fail with `InvalidCtx`. Ctx::global().shutdown(); // Join with the thread. let err = handle.join().unwrap().unwrap_err(); assert_eq!(err.kind(), ErrorKind::InvalidCtx); Ok(()) } #[cfg(test)] mod tests { use super::main; #[test] fn main_runs() { main().unwrap(); } }
Secure Request Reply
The previous example did not offer neither authentication nor encryption.
For a public TCP
connection, its a must. Let's fix that by adapting the
previous example.
This time we will the CURVE
mechanism, which is a public-key crypto.
To enable the usage of the CURVE
mechanism, the feature flag 'curve'
must be enabled.
However, this time we will use an external configuration file to get rid of all the boilerplate. This will also allows our application to run indepently of the socket configuration.
Based on some basic benchmarks, the CURVE
mechanism
might reduce the throughtput of I/O heavy applications by half due
to the overhead of the salsa20
encryption.
Config File
In this case we used yaml
configuration file, but any file format
supported by Serde
will work (as long as it supports typed enums).
# The curve keys where generated by running:
# `$ cargo run --example gen_curve_cert`
auth:
# The public keys allowed to authenticate. Note that this is
# the client's public key.
curve_registry:
- "n%3)5@(3pzp)v8yt6RW3eQVq5OQYb#TEodD^6oA^"
client:
# In a real life scenario the server would have a known addr.
#connect:
# - tcp: "127.0.0.1:3000"
heartbeat:
interval: 1s
timeout: 3s
ttl: 3s
send_high_water_mark: 10
send_timeout: 300ms
recv_high_water_mark: 100
recv_timeout: 300ms
mechanism:
curve_client:
client:
public: "n%3)5@(3pzp)v8yt6RW3eQVq5OQYb#TEodD^6oA^"
secret: "JiUDa>>owH1+mPTWs=>Jcyt%h.C1E4Js>)(g{geY"
# This is the server's public key.
server: "et189NB9uJC7?J+XU8JRhCbF?gOP9+o%kli=y2b8"
server:
# Here we use a system defined port so as to not conflict with the host
# machine. In a real life scenario we would have a port available.
bind:
- tcp: "127.0.0.1:*"
heartbeat:
interval: 1s
timeout: 3s
ttl: 3s
mechanism:
curve_server:
secret: "iaoRiIVA^VgV:f4a<@{8K{cP62cE:dh=4:oY+^l("
The code
Aside from the additionnal logic for reading the config file, the code is now simpler than before.
use libzmq::{config::*, prelude::*, *}; use serde::{Deserialize, Serialize}; use std::{ fs::File, io::Read, path::{Path, PathBuf}, thread, }; const CONFIG_FILE: &str = "secure_req_rep.yml"; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { auth: AuthConfig, client: ClientConfig, server: ServerConfig, } fn read_file(name: &Path) -> std::io::Result<Vec<u8>> { let mut file = File::open(name)?; let mut buf = Vec::new(); file.read_to_end(&mut buf)?; Ok(buf) } fn main() -> Result<(), anyhow::Error> { let path = PathBuf::from("examples").join(CONFIG_FILE); let config: Config = serde_yaml::from_slice(&read_file(&path).unwrap()).unwrap(); // Configure the `AuthServer`. We won't need the returned `AuthClient`. let _ = config.auth.build()?; // Configure our two sockets. let server = config.server.build()?; let client = config.client.build()?; // Once again we used a system assigned port for our server. let bound = server.last_endpoint()?; client.connect(bound)?; // Spawn the server thread. In a real application, this // would be on another node. let handle = thread::spawn(move || -> Result<(), Error> { use ErrorKind::*; loop { let request = server.recv_msg()?; assert_eq!(request.to_str(), Ok("ping")); // Retrieve the routing_id to route the reply to the client. let id = request.routing_id().unwrap(); if let Err(err) = server.route("pong", id) { match err.kind() { // Cannot route msg, drop it. WouldBlock | HostUnreachable => (), _ => return Err(err.cast()), } } } }); // Do some request-reply work. client.send("ping")?; let msg = client.recv_msg()?; assert_eq!(msg.to_str(), Ok("pong")); // This will cause the server to fail with `InvalidCtx`. Ctx::global().shutdown(); // Join with the thread. let err = handle.join().unwrap().unwrap_err(); assert_eq!(err.kind(), ErrorKind::InvalidCtx); Ok(()) } #[cfg(test)] mod tests { use super::main; #[test] fn main_runs() { main().unwrap(); } }
Glossary
Some high level definitions.
Endpoint
A endpoint is a rendez-vous address for a specified transport. The syntax
of the address depends on the nature of the transport. For instance
a TcpAddr
is an endpoint over the TCP
transport.
Transport
A protocol to transfert data. Could be a network protocol, such as TCP
,
could be a inter-thread protocol, such as INPROC
, etc.
Connection
Connections in ZeroMQ
are different from traditional connections is the
sense that they automatically handle failure. For instance, if a connection
fails because of a network error, it will be automatically reconnected if
possible. Thus sockets should not worry about the state of a given connection.
Message
An atomic arbitrary set of bytes owned by the ZeroMQ
engine. ZeroMQ
does
not know how to interpret these bytes, only the user does. Messages are
the units that are transferred between sockets.
Socket
A ZeroMQ
construct used to send and receive messages using connections
accros endpoints. The specific behavior of the socket depends on its type.
High Water Mark
The message limit in the incoming or outgoing buffer. If the incoming buffer has reached this limit, the socket will stop receiving messages in the background. If the outgoing buffer has reached this limit, attempting to queue a message will block the calling thread. Conceptually, this is a socket's back throttling mechanism.
Context
A ZeroMQ
context is a session that keeps track of all the sockets,
the messages, the async threads and the internal queries.
Mechanism
A specific protocol used by sockets to authenticate and encrypt traffic.
Mute State
A socket that is in mute state is unable to queue and receive messages. This is likely because it has no peers. The condition for the mute state to occur depends on the socket type.