feat(cli): implement Wave 4 — aura binary (PKI, server/client, admin, bench)

aura-cli: clap command tree (pki init/issue-server/issue-client/revoke/list,
server, client, route add/list/remove, status, bench-crypto); TOML config with
~ expansion and split-tunnel rules -> RouteTable; JSON-over-Unix-socket admin
IPC; server/client data paths wiring transport + tunnel (TUN run needs root).
config/{server,client}.toml.example. 15 tests (pki roundtrip, config parse,
admin-socket roundtrip, loopback connection). Verified the real binary: --help,
bench-crypto, and a full CA->server->client cert workflow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xah30
2026-05-25 18:36:13 +03:00
parent c19a6c5586
commit cb89312a27
15 changed files with 2379 additions and 3 deletions
+555
View File
@@ -0,0 +1,555 @@
//! Admin IPC: a tiny JSON line protocol over a Unix domain socket.
//!
//! A running `aura server` / `aura client` hosts a [`serve`] listener over a shared [`AdminState`]
//! (the live `RouteTable`, a rule mirror, and tunnel [`Stats`]). The `aura route ...` and
//! `aura status` subcommands connect to the same socket and exchange one JSON object per line:
//!
//! ```text
//! -> {"cmd":"route_add","cidr":"8.8.8.0/24","action":"direct"}
//! <- {"ok":true}
//! -> {"cmd":"route_add","domain":"example.com","action":"vpn"}
//! <- {"ok":true}
//! -> {"cmd":"route_list"}
//! <- {"ok":true,"default":"vpn","cidrs":[{"cidr":"8.8.8.0/24","action":"direct"}],"domains":[...]}
//! -> {"cmd":"route_remove","cidr":"8.8.8.0/24"}
//! <- {"ok":true,"removed":true}
//! -> {"cmd":"status"}
//! <- {"ok":true,"peer_id":"client-1","rx_packets":0,"tx_packets":0,"default":"vpn","rules":1}
//! ```
//!
//! On error a response is `{"ok":false,"error":"..."}`.
//!
//! ## Why a rule mirror
//! The library [`RouteTable`] is the source of truth for *classification* but does not expose an
//! iterator over its rules, so the admin layer keeps a parallel [`RuleMirror`] updated in lockstep.
//! Every admin mutation touches both, so `route_list` can faithfully echo what is configured while
//! `classify` still goes through the real table.
//!
//! ## Platform note
//! The transport is `tokio::net::UnixListener` / `UnixStream`, available on Unix (the project's
//! Linux + macOS targets). On Windows this would be a named pipe; that path is a documented
//! `cfg`-gated stub ([`serve`] / [`request`] return an explanatory error) so the rest of the CLI
//! still compiles there.
use std::collections::BTreeMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex as StdMutex};
use aura_tunnel::{RouteAction, RouteTable};
use ipnetwork::IpNetwork;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use crate::config::parse_action;
/// Default admin socket path used when a config / flag does not override it.
pub const DEFAULT_SOCKET: &str = "/tmp/aura-admin.sock";
/// Live tunnel statistics shared between the data path and the admin listener.
#[derive(Debug, Default)]
pub struct Stats {
/// Packets received from the peer (inbound, toward the TUN).
pub rx_packets: AtomicU64,
/// Packets sent to the peer (outbound, from the TUN).
pub tx_packets: AtomicU64,
/// Verified peer identity, set once a connection is established.
pub peer_id: StdMutex<Option<String>>,
}
impl Stats {
/// Create a zeroed stats block.
pub fn new() -> Self {
Self::default()
}
/// Record the verified peer identity.
pub fn set_peer_id(&self, id: Option<String>) {
if let Ok(mut g) = self.peer_id.lock() {
*g = id;
}
}
}
/// A parallel record of admin-configured rules, so `route_list` can enumerate them (the library
/// [`RouteTable`] does not expose iteration). Kept in lockstep with the table.
#[derive(Debug, Default)]
pub struct RuleMirror {
/// CIDR rules, ordered for stable listing.
pub cidrs: StdMutex<BTreeMap<IpNetwork, RouteAction>>,
/// Domain rules, ordered for stable listing.
pub domains: StdMutex<BTreeMap<String, RouteAction>>,
}
impl RuleMirror {
/// Build a mirror pre-populated from an existing table snapshot's rules.
///
/// The constructor takes already-extracted rule lists (the config layer has them at build
/// time) so the mirror starts consistent with the table the data path was given.
pub fn from_rules(
cidrs: impl IntoIterator<Item = (IpNetwork, RouteAction)>,
domains: impl IntoIterator<Item = (String, RouteAction)>,
) -> Self {
Self {
cidrs: StdMutex::new(cidrs.into_iter().collect()),
domains: StdMutex::new(domains.into_iter().collect()),
}
}
}
/// Shared state the admin listener operates on.
#[derive(Clone)]
pub struct AdminState {
/// The live split-tunnel routing table (classification source of truth).
pub routes: Arc<RwLock<RouteTable>>,
/// Mirror of configured rules for enumeration.
pub mirror: Arc<RuleMirror>,
/// Live tunnel statistics.
pub stats: Arc<Stats>,
}
impl AdminState {
/// Construct admin state from a shared table and stats, seeding the mirror from the given rules.
pub fn new(
routes: Arc<RwLock<RouteTable>>,
stats: Arc<Stats>,
cidrs: impl IntoIterator<Item = (IpNetwork, RouteAction)>,
domains: impl IntoIterator<Item = (String, RouteAction)>,
) -> Self {
Self {
routes,
mirror: Arc::new(RuleMirror::from_rules(cidrs, domains)),
stats,
}
}
}
// ---- wire protocol ---------------------------------------------------------------------------
/// A request from the `aura route` / `aura status` client.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "cmd", rename_all = "snake_case")]
pub enum Request {
/// Add a CIDR or domain rule.
RouteAdd {
/// CIDR to add (mutually exclusive with `domain`).
#[serde(default, skip_serializing_if = "Option::is_none")]
cidr: Option<String>,
/// Domain to add (mutually exclusive with `cidr`).
#[serde(default, skip_serializing_if = "Option::is_none")]
domain: Option<String>,
/// Action: `"vpn"` or `"direct"`.
action: String,
},
/// List all rules and the default action.
RouteList,
/// Remove a CIDR rule (by exact network).
RouteRemove {
/// CIDR to remove.
cidr: String,
},
/// Query tunnel statistics.
Status,
}
/// One CIDR rule in a `route_list` response.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CidrEntry {
/// The CIDR network.
pub cidr: String,
/// The action applied to it.
pub action: String,
}
/// One domain rule in a `route_list` response.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DomainEntry {
/// The domain.
pub domain: String,
/// The action applied to it.
pub action: String,
}
/// A response to a [`Request`].
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Response {
/// Whether the command succeeded.
pub ok: bool,
/// Error message when `ok` is false.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
/// Default action (route_list / status).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default: Option<String>,
/// CIDR rules (route_list).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub cidrs: Option<Vec<CidrEntry>>,
/// Domain rules (route_list).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub domains: Option<Vec<DomainEntry>>,
/// Whether a `route_remove` actually removed something.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub removed: Option<bool>,
/// Verified peer id (status).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub peer_id: Option<String>,
/// Inbound packet count (status).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rx_packets: Option<u64>,
/// Outbound packet count (status).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tx_packets: Option<u64>,
/// Total rule count (status).
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rules: Option<usize>,
}
impl Response {
/// A bare success response.
pub fn ok() -> Self {
Self {
ok: true,
error: None,
default: None,
cidrs: None,
domains: None,
removed: None,
peer_id: None,
rx_packets: None,
tx_packets: None,
rules: None,
}
}
/// An error response carrying `msg`.
pub fn err(msg: impl Into<String>) -> Self {
Self {
ok: false,
error: Some(msg.into()),
..Self::ok()
}
}
}
/// Render the action string for the wire.
fn action_str(a: RouteAction) -> &'static str {
match a {
RouteAction::Vpn => "vpn",
RouteAction::Direct => "direct",
}
}
/// Apply a single request against the shared state and produce a response.
///
/// Factored out from the socket I/O so it is directly unit-testable.
pub async fn handle_request(state: &AdminState, req: Request) -> Response {
match req {
Request::RouteAdd {
cidr,
domain,
action,
} => {
let action = match parse_action(&action) {
Ok(a) => a,
Err(e) => return Response::err(e.to_string()),
};
match (cidr, domain) {
(Some(cidr), None) => match cidr.parse::<IpNetwork>() {
Ok(net) => {
state.routes.write().await.add_cidr(net, action);
if let Ok(mut m) = state.mirror.cidrs.lock() {
m.insert(net, action);
}
Response::ok()
}
Err(e) => Response::err(format!("invalid cidr '{cidr}': {e}")),
},
(None, Some(domain)) => {
state.routes.write().await.add_domain(&domain, action);
if let Ok(mut m) = state.mirror.domains.lock() {
m.insert(domain, action);
}
Response::ok()
}
(Some(_), Some(_)) => {
Response::err("specify exactly one of 'cidr' or 'domain', not both")
}
(None, None) => Response::err("specify exactly one of 'cidr' or 'domain'"),
}
}
Request::RouteList => {
let default = action_str(state.routes.read().await.default_action()).to_string();
let cidrs = state
.mirror
.cidrs
.lock()
.map(|m| {
m.iter()
.map(|(net, a)| CidrEntry {
cidr: net.to_string(),
action: action_str(*a).to_string(),
})
.collect()
})
.unwrap_or_default();
let domains = state
.mirror
.domains
.lock()
.map(|m| {
m.iter()
.map(|(d, a)| DomainEntry {
domain: d.clone(),
action: action_str(*a).to_string(),
})
.collect()
})
.unwrap_or_default();
Response {
default: Some(default),
cidrs: Some(cidrs),
domains: Some(domains),
..Response::ok()
}
}
Request::RouteRemove { cidr } => match cidr.parse::<IpNetwork>() {
Ok(net) => {
// Removing from the live table requires rebuilding it (no remove API), preserving
// the default and every other rule from the mirror.
let removed = state
.mirror
.cidrs
.lock()
.map(|mut m| m.remove(&net).is_some())
.unwrap_or(false);
if removed {
rebuild_table(state).await;
}
Response {
removed: Some(removed),
..Response::ok()
}
}
Err(e) => Response::err(format!("invalid cidr '{cidr}': {e}")),
},
Request::Status => {
let default = action_str(state.routes.read().await.default_action()).to_string();
let rules = state.mirror.cidrs.lock().map(|m| m.len()).unwrap_or(0)
+ state.mirror.domains.lock().map(|m| m.len()).unwrap_or(0);
let peer_id = state.stats.peer_id.lock().ok().and_then(|g| g.clone());
Response {
default: Some(default),
peer_id,
rx_packets: Some(state.stats.rx_packets.load(Ordering::Relaxed)),
tx_packets: Some(state.stats.tx_packets.load(Ordering::Relaxed)),
rules: Some(rules),
..Response::ok()
}
}
}
}
/// Rebuild the live [`RouteTable`] from the mirror (used after a `route_remove`, since the library
/// table has no per-rule removal). The default action is preserved; domain rules are re-added (their
/// previously resolved host routes are dropped and will be re-resolved on demand — acceptable for an
/// admin remove in v1).
async fn rebuild_table(state: &AdminState) {
let default = state.routes.read().await.default_action();
let mut fresh = RouteTable::new(default);
if let Ok(m) = state.mirror.cidrs.lock() {
for (net, a) in m.iter() {
fresh.add_cidr(*net, *a);
}
}
if let Ok(m) = state.mirror.domains.lock() {
for (d, a) in m.iter() {
fresh.add_domain(d, *a);
}
}
*state.routes.write().await = fresh;
}
/// Run the admin listener until the task is cancelled.
///
/// Removes any stale socket at `path`, binds a [`tokio::net::UnixListener`], and serves connections
/// (one request/response per accepted line) over the shared `state`.
#[cfg(unix)]
pub async fn serve(path: &str, state: AdminState) -> anyhow::Result<()> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
// Best-effort cleanup of a previous run's socket file.
let _ = std::fs::remove_file(path);
let listener = UnixListener::bind(path)
.map_err(|e| anyhow::anyhow!("binding admin socket {path}: {e}"))?;
tracing::info!(socket = path, "admin IPC listening");
loop {
let (stream, _addr) = match listener.accept().await {
Ok(pair) => pair,
Err(e) => {
tracing::warn!(error = %e, "admin accept failed");
continue;
}
};
let state = state.clone();
tokio::spawn(async move {
let (read_half, mut write_half) = stream.into_split();
let mut lines = BufReader::new(read_half).lines();
while let Ok(Some(line)) = lines.next_line().await {
if line.trim().is_empty() {
continue;
}
let resp = match serde_json::from_str::<Request>(&line) {
Ok(req) => handle_request(&state, req).await,
Err(e) => Response::err(format!("bad request: {e}")),
};
let mut buf = serde_json::to_vec(&resp)
.unwrap_or_else(|_| b"{\"ok\":false,\"error\":\"serialize failed\"}".to_vec());
buf.push(b'\n');
if write_half.write_all(&buf).await.is_err() {
break;
}
}
});
}
}
/// Windows stub: the admin socket uses Unix domain sockets; a named-pipe transport is future work.
#[cfg(not(unix))]
pub async fn serve(_path: &str, _state: AdminState) -> anyhow::Result<()> {
anyhow::bail!("admin IPC over Unix sockets is unavailable on this platform (Windows named-pipe transport is not yet implemented)")
}
/// Connect to the admin socket, send one [`Request`], and return the [`Response`].
#[cfg(unix)]
pub async fn request(path: &str, req: &Request) -> anyhow::Result<Response> {
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixStream;
let stream = UnixStream::connect(path).await.map_err(|e| {
anyhow::anyhow!(
"connecting to admin socket {path}: {e} (is `aura server`/`aura client` running?)"
)
})?;
let (read_half, mut write_half) = stream.into_split();
let mut buf = serde_json::to_vec(req)?;
buf.push(b'\n');
write_half.write_all(&buf).await?;
write_half.flush().await?;
let mut lines = BufReader::new(read_half).lines();
let line = lines
.next_line()
.await?
.ok_or_else(|| anyhow::anyhow!("admin socket closed without a response"))?;
Ok(serde_json::from_str(&line)?)
}
/// Windows stub mirroring [`serve`].
#[cfg(not(unix))]
pub async fn request(_path: &str, _req: &Request) -> anyhow::Result<Response> {
anyhow::bail!("admin IPC over Unix sockets is unavailable on this platform (Windows named-pipe transport is not yet implemented)")
}
#[cfg(test)]
mod tests {
use super::*;
fn state() -> AdminState {
AdminState::new(
Arc::new(RwLock::new(RouteTable::new(RouteAction::Vpn))),
Arc::new(Stats::new()),
std::iter::empty(),
std::iter::empty(),
)
}
#[tokio::test]
async fn route_add_cidr_then_classify_and_list() {
let st = state();
let resp = handle_request(
&st,
Request::RouteAdd {
cidr: Some("8.8.8.0/24".into()),
domain: None,
action: "direct".into(),
},
)
.await;
assert!(resp.ok, "route_add should succeed: {:?}", resp.error);
assert_eq!(
st.routes.read().await.classify("8.8.8.8".parse().unwrap()),
RouteAction::Direct
);
let list = handle_request(&st, Request::RouteList).await;
assert_eq!(list.default.as_deref(), Some("vpn"));
let cidrs = list.cidrs.unwrap();
assert_eq!(cidrs.len(), 1);
assert_eq!(cidrs[0].cidr, "8.8.8.0/24");
assert_eq!(cidrs[0].action, "direct");
}
#[tokio::test]
async fn route_remove_updates_table_and_mirror() {
let st = state();
for cidr in ["8.8.8.0/24", "1.1.1.0/24"] {
handle_request(
&st,
Request::RouteAdd {
cidr: Some(cidr.into()),
domain: None,
action: "direct".into(),
},
)
.await;
}
let resp = handle_request(
&st,
Request::RouteRemove {
cidr: "8.8.8.0/24".into(),
},
)
.await;
assert_eq!(resp.removed, Some(true));
// The removed rule no longer classifies as Direct (falls back to default VPN).
assert_eq!(
st.routes.read().await.classify("8.8.8.8".parse().unwrap()),
RouteAction::Vpn
);
// The other rule survives.
assert_eq!(
st.routes.read().await.classify("1.1.1.1".parse().unwrap()),
RouteAction::Direct
);
let list = handle_request(&st, Request::RouteList).await;
assert_eq!(list.cidrs.unwrap().len(), 1);
}
#[tokio::test]
async fn route_add_rejects_bad_cidr() {
let st = state();
let resp = handle_request(
&st,
Request::RouteAdd {
cidr: Some("not-a-cidr".into()),
domain: None,
action: "vpn".into(),
},
)
.await;
assert!(!resp.ok);
}
#[tokio::test]
async fn status_reports_default_and_counters() {
let st = state();
st.stats.tx_packets.store(5, Ordering::Relaxed);
st.stats.set_peer_id(Some("client-9".into()));
let resp = handle_request(&st, Request::Status).await;
assert!(resp.ok);
assert_eq!(resp.default.as_deref(), Some("vpn"));
assert_eq!(resp.tx_packets, Some(5));
assert_eq!(resp.peer_id.as_deref(), Some("client-9"));
}
}