//! Admin IPC: a tiny JSON line protocol over a Unix domain socket (Unix) or a named pipe (Windows). //! //! A running `aura server` / `aura client` hosts a [`serve`] listener over a shared [`AdminState`] //! (the live `RouteTable`, a rule mirror, and tunnel [`Stats`]). The `aura route ...` and //! `aura status` subcommands connect to the same socket and exchange one JSON object per line: //! //! ```text //! -> {"cmd":"route_add","cidr":"8.8.8.0/24","action":"direct"} //! <- {"ok":true} //! -> {"cmd":"route_add","domain":"example.com","action":"vpn"} //! <- {"ok":true} //! -> {"cmd":"route_list"} //! <- {"ok":true,"default":"vpn","cidrs":[{"cidr":"8.8.8.0/24","action":"direct"}],"domains":[...]} //! -> {"cmd":"route_remove","cidr":"8.8.8.0/24"} //! <- {"ok":true,"removed":true} //! -> {"cmd":"status"} //! <- {"ok":true,"peer_id":"client-1","rx_packets":0,"tx_packets":0,"default":"vpn","rules":1} //! ``` //! //! On error a response is `{"ok":false,"error":"..."}`. //! //! ## Why a rule mirror //! The library [`RouteTable`] is the source of truth for *classification* but does not expose an //! iterator over its rules, so the admin layer keeps a parallel [`RuleMirror`] updated in lockstep. //! Every admin mutation touches both, so `route_list` can faithfully echo what is configured while //! `classify` still goes through the real table. //! //! ## Cross-platform transport //! The wire protocol is identical; only the per-platform stream type differs: //! //! * **Unix**: `tokio::net::UnixListener` / `UnixStream` over `/tmp/aura-admin.sock`. //! * **Windows**: `tokio::net::windows::named_pipe::{NamedPipeServer, NamedPipeClient}` over //! `\\.\pipe\aura-admin`. The standard Tokio pattern is to rebuild a fresh `ServerOptions` //! instance after every accept so subsequent clients can also connect. //! //! See [`transport`] for the platform-specific listen/connect glue. The handler ([`handle_request`]) //! and the wire types are platform-agnostic. use std::collections::BTreeMap; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex as StdMutex}; use aura_tunnel::{PacketCounters, RouteAction, RouteTable}; use ipnetwork::IpNetwork; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::sync::RwLock; use crate::config::parse_action; /// Default admin transport endpoint used when a config / flag does not override it. On Unix this /// is a filesystem path under `/tmp`; on Windows it is a named pipe path under `\\.\pipe\`. #[cfg(unix)] pub const DEFAULT_SOCKET: &str = "/tmp/aura-admin.sock"; /// Default admin transport endpoint on Windows: a named pipe in the local pipe namespace. #[cfg(windows)] pub const DEFAULT_SOCKET: &str = r"\\.\pipe\aura-admin"; /// Live tunnel statistics shared between the data path and the admin listener. /// /// The two packet counters are `Arc` so the same atomics can be cloned into the /// [`aura_tunnel::AuraRouter`] (via [`Stats::counters`]) and bumped from the data path. The admin /// `Status` handler reads them through this struct; `aura status` sees live numbers because both /// sides are looking at the same memory. #[derive(Debug, Default)] pub struct Stats { /// Packets received from the peer (inbound, toward the TUN). pub rx_packets: Arc, /// Packets sent to the peer (outbound, from the TUN). pub tx_packets: Arc, /// Verified peer identity, set once a connection is established. pub peer_id: StdMutex>, } impl Stats { /// Create a zeroed stats block. pub fn new() -> Self { Self::default() } /// Record the verified peer identity. pub fn set_peer_id(&self, id: Option) { if let Ok(mut g) = self.peer_id.lock() { *g = id; } } /// Hand out a [`PacketCounters`] handle pointing at the same `tx`/`rx` atomics. /// /// The CLI passes this into [`aura_tunnel::AuraRouter::with_stats`] / the per-client server /// router so the data path bumps the same counters the admin `Status` handler reads. pub fn counters(&self) -> PacketCounters { PacketCounters { tx: Arc::clone(&self.tx_packets), rx: Arc::clone(&self.rx_packets), } } } /// A parallel record of admin-configured rules, so `route_list` can enumerate them (the library /// [`RouteTable`] does not expose iteration). Kept in lockstep with the table. #[derive(Debug, Default)] pub struct RuleMirror { /// CIDR rules, ordered for stable listing. pub cidrs: StdMutex>, /// Domain rules, ordered for stable listing. pub domains: StdMutex>, } impl RuleMirror { /// Build a mirror pre-populated from an existing table snapshot's rules. /// /// The constructor takes already-extracted rule lists (the config layer has them at build /// time) so the mirror starts consistent with the table the data path was given. pub fn from_rules( cidrs: impl IntoIterator, domains: impl IntoIterator, ) -> Self { Self { cidrs: StdMutex::new(cidrs.into_iter().collect()), domains: StdMutex::new(domains.into_iter().collect()), } } } /// Shared state the admin listener operates on. #[derive(Clone)] pub struct AdminState { /// The live split-tunnel routing table (classification source of truth). pub routes: Arc>, /// Mirror of configured rules for enumeration. pub mirror: Arc, /// Live tunnel statistics. pub stats: Arc, } impl AdminState { /// Construct admin state from a shared table and stats, seeding the mirror from the given rules. pub fn new( routes: Arc>, stats: Arc, cidrs: impl IntoIterator, domains: impl IntoIterator, ) -> Self { Self { routes, mirror: Arc::new(RuleMirror::from_rules(cidrs, domains)), stats, } } } // ---- wire protocol --------------------------------------------------------------------------- /// A request from the `aura route` / `aura status` client. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "cmd", rename_all = "snake_case")] pub enum Request { /// Add a CIDR or domain rule. RouteAdd { /// CIDR to add (mutually exclusive with `domain`). #[serde(default, skip_serializing_if = "Option::is_none")] cidr: Option, /// Domain to add (mutually exclusive with `cidr`). #[serde(default, skip_serializing_if = "Option::is_none")] domain: Option, /// Action: `"vpn"` or `"direct"`. action: String, }, /// List all rules and the default action. RouteList, /// Remove a CIDR rule (by exact network). RouteRemove { /// CIDR to remove. cidr: String, }, /// Query tunnel statistics. Status, } /// One CIDR rule in a `route_list` response. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct CidrEntry { /// The CIDR network. pub cidr: String, /// The action applied to it. pub action: String, } /// One domain rule in a `route_list` response. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct DomainEntry { /// The domain. pub domain: String, /// The action applied to it. pub action: String, } /// A response to a [`Request`]. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Response { /// Whether the command succeeded. pub ok: bool, /// Error message when `ok` is false. #[serde(default, skip_serializing_if = "Option::is_none")] pub error: Option, /// Default action (route_list / status). #[serde(default, skip_serializing_if = "Option::is_none")] pub default: Option, /// CIDR rules (route_list). #[serde(default, skip_serializing_if = "Option::is_none")] pub cidrs: Option>, /// Domain rules (route_list). #[serde(default, skip_serializing_if = "Option::is_none")] pub domains: Option>, /// Whether a `route_remove` actually removed something. #[serde(default, skip_serializing_if = "Option::is_none")] pub removed: Option, /// Verified peer id (status). #[serde(default, skip_serializing_if = "Option::is_none")] pub peer_id: Option, /// Inbound packet count (status). #[serde(default, skip_serializing_if = "Option::is_none")] pub rx_packets: Option, /// Outbound packet count (status). #[serde(default, skip_serializing_if = "Option::is_none")] pub tx_packets: Option, /// Total rule count (status). #[serde(default, skip_serializing_if = "Option::is_none")] pub rules: Option, } impl Response { /// A bare success response. pub fn ok() -> Self { Self { ok: true, error: None, default: None, cidrs: None, domains: None, removed: None, peer_id: None, rx_packets: None, tx_packets: None, rules: None, } } /// An error response carrying `msg`. pub fn err(msg: impl Into) -> Self { Self { ok: false, error: Some(msg.into()), ..Self::ok() } } } /// Render the action string for the wire. fn action_str(a: RouteAction) -> &'static str { match a { RouteAction::Vpn => "vpn", RouteAction::Direct => "direct", } } /// Apply a single request against the shared state and produce a response. /// /// Factored out from the socket I/O so it is directly unit-testable. pub async fn handle_request(state: &AdminState, req: Request) -> Response { match req { Request::RouteAdd { cidr, domain, action, } => { let action = match parse_action(&action) { Ok(a) => a, Err(e) => return Response::err(e.to_string()), }; match (cidr, domain) { (Some(cidr), None) => match cidr.parse::() { Ok(net) => { state.routes.write().await.add_cidr(net, action); if let Ok(mut m) = state.mirror.cidrs.lock() { m.insert(net, action); } Response::ok() } Err(e) => Response::err(format!("invalid cidr '{cidr}': {e}")), }, (None, Some(domain)) => { state.routes.write().await.add_domain(&domain, action); if let Ok(mut m) = state.mirror.domains.lock() { m.insert(domain, action); } Response::ok() } (Some(_), Some(_)) => { Response::err("specify exactly one of 'cidr' or 'domain', not both") } (None, None) => Response::err("specify exactly one of 'cidr' or 'domain'"), } } Request::RouteList => { let default = action_str(state.routes.read().await.default_action()).to_string(); let cidrs = state .mirror .cidrs .lock() .map(|m| { m.iter() .map(|(net, a)| CidrEntry { cidr: net.to_string(), action: action_str(*a).to_string(), }) .collect() }) .unwrap_or_default(); let domains = state .mirror .domains .lock() .map(|m| { m.iter() .map(|(d, a)| DomainEntry { domain: d.clone(), action: action_str(*a).to_string(), }) .collect() }) .unwrap_or_default(); Response { default: Some(default), cidrs: Some(cidrs), domains: Some(domains), ..Response::ok() } } Request::RouteRemove { cidr } => match cidr.parse::() { Ok(net) => { // Removing from the live table requires rebuilding it (no remove API), preserving // the default and every other rule from the mirror. let removed = state .mirror .cidrs .lock() .map(|mut m| m.remove(&net).is_some()) .unwrap_or(false); if removed { rebuild_table(state).await; } Response { removed: Some(removed), ..Response::ok() } } Err(e) => Response::err(format!("invalid cidr '{cidr}': {e}")), }, Request::Status => { let default = action_str(state.routes.read().await.default_action()).to_string(); let rules = state.mirror.cidrs.lock().map(|m| m.len()).unwrap_or(0) + state.mirror.domains.lock().map(|m| m.len()).unwrap_or(0); let peer_id = state.stats.peer_id.lock().ok().and_then(|g| g.clone()); Response { default: Some(default), peer_id, rx_packets: Some(state.stats.rx_packets.load(Ordering::Relaxed)), tx_packets: Some(state.stats.tx_packets.load(Ordering::Relaxed)), rules: Some(rules), ..Response::ok() } } } } /// Rebuild the live [`RouteTable`] from the mirror (used after a `route_remove`, since the library /// table has no per-rule removal). The default action is preserved; domain rules are re-added (their /// previously resolved host routes are dropped and will be re-resolved on demand — acceptable for an /// admin remove in v1). async fn rebuild_table(state: &AdminState) { let default = state.routes.read().await.default_action(); let mut fresh = RouteTable::new(default); if let Ok(m) = state.mirror.cidrs.lock() { for (net, a) in m.iter() { fresh.add_cidr(*net, *a); } } if let Ok(m) = state.mirror.domains.lock() { for (d, a) in m.iter() { fresh.add_domain(d, *a); } } *state.routes.write().await = fresh; } // ---- platform transport --------------------------------------------------------------------- mod transport { //! Platform glue for the admin transport. The Unix and Windows variants present the same //! `listen` / `connect` interface so [`super::serve`] / [`super::request`] can be written //! once over `AsyncRead + AsyncWrite` streams. #[cfg(unix)] pub use self::unix::{accept, connect, listen}; #[cfg(windows)] pub use self::windows::{accept, connect, listen}; #[cfg(unix)] mod unix { use std::io; use tokio::net::{UnixListener, UnixStream}; /// Bind a Unix domain socket at `path`, removing any stale socket file first. pub fn listen(path: &str) -> io::Result { let _ = std::fs::remove_file(path); UnixListener::bind(path) } /// Accept the next admin client. Returns the stream half on success. pub async fn accept(listener: &UnixListener) -> io::Result { let (stream, _addr) = listener.accept().await?; Ok(stream) } /// Connect to a Unix domain socket at `path`. pub async fn connect(path: &str) -> io::Result { UnixStream::connect(path).await } } #[cfg(windows)] mod windows { //! Windows transport: named pipe in the local namespace (`\\.\pipe\`). //! //! Tokio's `NamedPipeServer` represents one already-bound endpoint. The standard accept //! pattern is: //! //! 1. Build one endpoint with `ServerOptions::new().first_pipe_instance(true).create(path)`. //! 2. `connect().await` to wait for a client to open the pipe. //! 3. *Before* serving the request, build a fresh endpoint via the same options so the //! next client has somewhere to connect — otherwise the namespace entry disappears //! once we hand the current instance off to the request handler. //! //! We model that as a [`Listener`] wrapper that owns the latest "pending" instance plus //! the `ServerOptions` template. use std::io; use tokio::net::windows::named_pipe::{ ClientOptions, NamedPipeClient, NamedPipeServer, ServerOptions, }; use tokio::time::{sleep, Duration}; /// Named-pipe listener. Owns the next-to-be-connected instance. pub struct Listener { path: String, pending: NamedPipeServer, } /// Create the initial pipe instance and wrap it in a [`Listener`]. pub fn listen(path: &str) -> io::Result { let pending = ServerOptions::new() .first_pipe_instance(true) .create(path)?; Ok(Listener { path: path.to_string(), pending, }) } /// Wait for a client, then rebuild the pending instance so subsequent clients can also /// connect; return the now-connected server endpoint. pub async fn accept(listener: &mut Listener) -> io::Result { listener.pending.connect().await?; // Rotate: keep the connected instance to return, replace `pending` with a fresh one. let next = ServerOptions::new().create(&listener.path)?; let connected = std::mem::replace(&mut listener.pending, next); Ok(connected) } /// Connect to a named pipe at `path`. Retries briefly on `ERROR_PIPE_BUSY` (the kernel /// returns this when every server instance is busy answering another client; a short /// pause + retry is the documented idiom). pub async fn connect(path: &str) -> io::Result { // ERROR_PIPE_BUSY = 231. const PIPE_BUSY: i32 = 231; for _ in 0..50 { match ClientOptions::new().open(path) { Ok(c) => return Ok(c), Err(e) if e.raw_os_error() == Some(PIPE_BUSY) => { sleep(Duration::from_millis(20)).await; } Err(e) => return Err(e), } } // One last attempt; if it still fails surface the underlying error. ClientOptions::new().open(path) } } } /// Run the admin listener until the task is cancelled. /// /// Binds the platform listener at `path` and serves one request/response per accepted line over /// the shared `state`. On Unix this is a Unix domain socket; on Windows this is a named pipe. pub async fn serve(path: &str, state: AdminState) -> anyhow::Result<()> { #[cfg(unix)] { let listener = transport::listen(path) .map_err(|e| anyhow::anyhow!("binding admin socket {path}: {e}"))?; tracing::info!(socket = path, "admin IPC listening"); loop { let stream = match transport::accept(&listener).await { Ok(s) => s, Err(e) => { tracing::warn!(error = %e, "admin accept failed"); continue; } }; let state_clone = state.clone(); tokio::spawn(async move { let (read_half, write_half) = stream.into_split(); serve_connection(read_half, write_half, state_clone).await; }); } } #[cfg(windows)] { let mut listener = transport::listen(path) .map_err(|e| anyhow::anyhow!("binding admin pipe {path}: {e}"))?; tracing::info!(pipe = path, "admin IPC listening"); loop { let stream = match transport::accept(&mut listener).await { Ok(s) => s, Err(e) => { tracing::warn!(error = %e, "admin pipe accept failed"); continue; } }; let state_clone = state.clone(); // The Tokio NamedPipeServer implements AsyncRead + AsyncWrite directly; we cannot // `into_split` it the way we do with UnixStream, so wrap it in tokio::io::split. tokio::spawn(async move { let (read_half, write_half) = tokio::io::split(stream); serve_connection(read_half, write_half, state_clone).await; }); } } #[cfg(not(any(unix, windows)))] { let _ = (path, state); anyhow::bail!("admin IPC is not supported on this platform (need unix sockets or windows named pipes)") } } /// Common per-connection loop: read one JSON-line request, write one JSON-line response, repeat /// until the client disconnects. async fn serve_connection(read_half: R, mut write_half: W, state: AdminState) where R: tokio::io::AsyncRead + Unpin, W: tokio::io::AsyncWrite + Unpin, { let mut lines = BufReader::new(read_half).lines(); while let Ok(Some(line)) = lines.next_line().await { if line.trim().is_empty() { continue; } let resp = match serde_json::from_str::(&line) { Ok(req) => handle_request(&state, req).await, Err(e) => Response::err(format!("bad request: {e}")), }; let mut buf = serde_json::to_vec(&resp) .unwrap_or_else(|_| b"{\"ok\":false,\"error\":\"serialize failed\"}".to_vec()); buf.push(b'\n'); if write_half.write_all(&buf).await.is_err() { break; } } } /// Connect to the admin transport, send one [`Request`], and return the [`Response`]. pub async fn request(path: &str, req: &Request) -> anyhow::Result { #[cfg(unix)] { let stream = transport::connect(path).await.map_err(|e| { anyhow::anyhow!( "connecting to admin socket {path}: {e} (is `aura server`/`aura client` running?)" ) })?; let (read_half, write_half) = stream.into_split(); return request_over(read_half, write_half, req).await; } #[cfg(windows)] { let stream = transport::connect(path).await.map_err(|e| { anyhow::anyhow!( "connecting to admin pipe {path}: {e} (is `aura server`/`aura client` running?)" ) })?; let (read_half, write_half) = tokio::io::split(stream); return request_over(read_half, write_half, req).await; } #[cfg(not(any(unix, windows)))] { let _ = (path, req); anyhow::bail!("admin IPC is not supported on this platform") } } /// Generic request/response over any split stream. async fn request_over( read_half: R, mut write_half: W, req: &Request, ) -> anyhow::Result where R: tokio::io::AsyncRead + Unpin, W: tokio::io::AsyncWrite + Unpin, { let mut buf = serde_json::to_vec(req)?; buf.push(b'\n'); write_half.write_all(&buf).await?; write_half.flush().await?; let mut lines = BufReader::new(read_half).lines(); let line = lines .next_line() .await? .ok_or_else(|| anyhow::anyhow!("admin socket closed without a response"))?; Ok(serde_json::from_str(&line)?) } #[cfg(test)] mod tests { use super::*; fn state() -> AdminState { AdminState::new( Arc::new(RwLock::new(RouteTable::new(RouteAction::Vpn))), Arc::new(Stats::new()), std::iter::empty(), std::iter::empty(), ) } #[tokio::test] async fn route_add_cidr_then_classify_and_list() { let st = state(); let resp = handle_request( &st, Request::RouteAdd { cidr: Some("8.8.8.0/24".into()), domain: None, action: "direct".into(), }, ) .await; assert!(resp.ok, "route_add should succeed: {:?}", resp.error); assert_eq!( st.routes.read().await.classify("8.8.8.8".parse().unwrap()), RouteAction::Direct ); let list = handle_request(&st, Request::RouteList).await; assert_eq!(list.default.as_deref(), Some("vpn")); let cidrs = list.cidrs.unwrap(); assert_eq!(cidrs.len(), 1); assert_eq!(cidrs[0].cidr, "8.8.8.0/24"); assert_eq!(cidrs[0].action, "direct"); } #[tokio::test] async fn route_remove_updates_table_and_mirror() { let st = state(); for cidr in ["8.8.8.0/24", "1.1.1.0/24"] { handle_request( &st, Request::RouteAdd { cidr: Some(cidr.into()), domain: None, action: "direct".into(), }, ) .await; } let resp = handle_request( &st, Request::RouteRemove { cidr: "8.8.8.0/24".into(), }, ) .await; assert_eq!(resp.removed, Some(true)); // The removed rule no longer classifies as Direct (falls back to default VPN). assert_eq!( st.routes.read().await.classify("8.8.8.8".parse().unwrap()), RouteAction::Vpn ); // The other rule survives. assert_eq!( st.routes.read().await.classify("1.1.1.1".parse().unwrap()), RouteAction::Direct ); let list = handle_request(&st, Request::RouteList).await; assert_eq!(list.cidrs.unwrap().len(), 1); } #[tokio::test] async fn route_add_rejects_bad_cidr() { let st = state(); let resp = handle_request( &st, Request::RouteAdd { cidr: Some("not-a-cidr".into()), domain: None, action: "vpn".into(), }, ) .await; assert!(!resp.ok); } #[tokio::test] async fn status_reports_default_and_counters() { let st = state(); st.stats.tx_packets.store(5, Ordering::Relaxed); st.stats.set_peer_id(Some("client-9".into())); let resp = handle_request(&st, Request::Status).await; assert!(resp.ok); assert_eq!(resp.default.as_deref(), Some("vpn")); assert_eq!(resp.tx_packets, Some(5)); assert_eq!(resp.peer_id.as_deref(), Some("client-9")); } /// The platform-default endpoint is set correctly for each target. (Inspection-only on /// non-host platforms; the cfg picks the right constant at compile time.) #[test] fn default_socket_const_is_platform_appropriate() { #[cfg(unix)] assert_eq!(DEFAULT_SOCKET, "/tmp/aura-admin.sock"); #[cfg(windows)] assert_eq!(DEFAULT_SOCKET, r"\\.\pipe\aura-admin"); } }