Reliable Request Reply

This is a basic example when using the TCP transport adapting the code from the previous Basic Request Reply example.

Note that this example does not make any attempt at security.

Since TCP is connection oriented transport, we have to take in account that the connection might fail at any time. We use heartbeating to detect failure but also send and recv timeouts to prevent blocking forever.

In this example, the server is protected against failures since it will drop messages if it is unable to route them before send_timeout expires (WouldBlock), or it detects that the peer disconnected via the heartbeats (HostUnreachable).

The client in this case will simply fail if it unable to send a request before the send_timeout or unable to receive a reply before the recv_timeout (WouldBlock). The client might choose to retry later or connect to another server etc.

use libzmq::{prelude::*, *};

use std::{thread, time::Duration};

fn main() -> Result<(), anyhow::Error> {
    // We use a system assigned port here.
    let addr: TcpAddr = "127.0.0.1:*".try_into()?;
    let duration = Duration::from_millis(300);

    let hb = Heartbeat::new(duration)
        .add_timeout(3 * duration)
        .add_ttl(3 * duration);

    let server = ServerBuilder::new()
        .bind(addr)
        .send_timeout(duration)
        .heartbeat(&hb)
        .build()?;

    // Retrieve the assigned port.
    let bound = server.last_endpoint()?;

    // Spawn the server thread. In a real application, this
    // would be on another node.
    let handle = thread::spawn(move || -> Result<(), Error> {
        use ErrorKind::*;
        loop {
            let request = server.recv_msg()?;
            assert_eq!(request.to_str(), Ok("ping"));

            // Retrieve the routing_id to route the reply to the client.
            let id = request.routing_id().unwrap();
            if let Err(err) = server.route("pong", id) {
                match err.kind() {
                    // Cannot route msg, drop it.
                    WouldBlock | HostUnreachable => (),
                    _ => return Err(err.cast()),
                }
            }
        }
    });

    let client = ClientBuilder::new()
        .connect(bound)
        .recv_timeout(duration)
        .send_timeout(duration)
        .heartbeat(hb)
        .build()?;

    // Do some request-reply work.
    client.send("ping")?;
    let msg = client.recv_msg()?;
    assert_eq!(msg.to_str(), Ok("pong"));

    // This will cause the server to fail with `InvalidCtx`.
    Ctx::global().shutdown();

    // Join with the thread.
    let err = handle.join().unwrap().unwrap_err();
    assert_eq!(err.kind(), ErrorKind::InvalidCtx);

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::main;

    #[test]
    fn main_runs() {
        main().unwrap();
    }
}