Reliable Request Reply
This is a basic example when using the TCP
transport adapting the code
from the previous Basic Request Reply
example.
Note that this example does not make any attempt at security.
Since TCP
is connection oriented transport, we have to take in account that
the connection might fail at any time. We use heartbeating to detect failure
but also send
and recv
timeouts to prevent blocking forever.
In this example, the server is protected against failures since it will drop
messages if it is unable to route them before send_timeout
expires (WouldBlock
),
or it detects that the peer disconnected via the heartbeats (HostUnreachable
).
The client in this case will simply fail if it unable to send a request before the
send_timeout
or unable to receive a reply before the recv_timeout
(WouldBlock
).
The client might choose to retry later or connect to another server etc.
use libzmq::{prelude::*, *}; use std::{thread, time::Duration}; fn main() -> Result<(), anyhow::Error> { // We use a system assigned port here. let addr: TcpAddr = "127.0.0.1:*".try_into()?; let duration = Duration::from_millis(300); let hb = Heartbeat::new(duration) .add_timeout(3 * duration) .add_ttl(3 * duration); let server = ServerBuilder::new() .bind(addr) .send_timeout(duration) .heartbeat(&hb) .build()?; // Retrieve the assigned port. let bound = server.last_endpoint()?; // Spawn the server thread. In a real application, this // would be on another node. let handle = thread::spawn(move || -> Result<(), Error> { use ErrorKind::*; loop { let request = server.recv_msg()?; assert_eq!(request.to_str(), Ok("ping")); // Retrieve the routing_id to route the reply to the client. let id = request.routing_id().unwrap(); if let Err(err) = server.route("pong", id) { match err.kind() { // Cannot route msg, drop it. WouldBlock | HostUnreachable => (), _ => return Err(err.cast()), } } } }); let client = ClientBuilder::new() .connect(bound) .recv_timeout(duration) .send_timeout(duration) .heartbeat(hb) .build()?; // Do some request-reply work. client.send("ping")?; let msg = client.recv_msg()?; assert_eq!(msg.to_str(), Ok("pong")); // This will cause the server to fail with `InvalidCtx`. Ctx::global().shutdown(); // Join with the thread. let err = handle.join().unwrap().unwrap_err(); assert_eq!(err.kind(), ErrorKind::InvalidCtx); Ok(()) } #[cfg(test)] mod tests { use super::main; #[test] fn main_runs() { main().unwrap(); } }