diff --git a/config/server.toml.example b/config/server.toml.example index 436d1a2..104a025 100644 --- a/config/server.toml.example +++ b/config/server.toml.example @@ -18,13 +18,37 @@ cert = "~/.aura/server.crt" key = "~/.aura/server.key" [tunnel] -# Address pool for clients; v1 uses a single shared server-side TUN on this network. +# Address pool / TUN network. v2 reads the active pool config from [server.pool] below; this value +# is kept as the v1-compatible fallback (used when [server.pool] is omitted entirely) and as the +# network the server-side TUN brings up. The server's own TUN IP is the network's first usable host +# (e.g. 10.7.0.1 for 10.7.0.0/24). pool_cidr = "10.7.0.0/24" # TUN MTU (leave headroom under the path MTU for QUIC + Aura framing). mtu = 1420 # DNS server advertised to clients (informational in v1). dns = "10.7.0.1" +# v2 per-client IP pool. Each authenticated client gets its own address from `cidr`; the server's +# in-memory `client_ip -> connection` map demultiplexes TUN reads by destination IP. Omit the +# whole [server.pool] section to get the v1-compatible fallback: [tunnel] pool_cidr is reused as a +# dynamic-only pool with no static reservations. +[server.pool] +# Pool CIDR. Optional; defaults to [tunnel] pool_cidr when omitted. Must contain the server's own +# TUN address (the network's first host) and every entry in [server.pool.static]. +cidr = "10.7.0.0/24" +# Allocation strategy: +# "static_only" — only ids listed in [server.pool.static] are admitted; unknowns refused. +# "dynamic_only" — static map is ignored; everyone gets the next free address. +# "static_or_dynamic" — static reservation wins; unknown ids get a dynamic address (default). +strategy = "static_or_dynamic" + +# Optional `client_id -> ip` pinnings. The key is the verified Common Name from the client's +# certificate (see `aura pki issue-client --id `); the value must lie inside `cidr` above and +# must not collide with the server's own address or another reservation. +[server.pool.static] +# "phone-1" = "10.7.0.20" +# "laptop-1" = "10.7.0.21" + [mimicry] # Outer-TLS camouflage hostname the server presents/expects. sni = "cdn.example.com" diff --git a/crates/aura-cli/Cargo.toml b/crates/aura-cli/Cargo.toml index 1afaac3..eea8300 100644 --- a/crates/aura-cli/Cargo.toml +++ b/crates/aura-cli/Cargo.toml @@ -37,3 +37,5 @@ tokio.workspace = true # Loopback + PKI-roundtrip tests build certificate chains for the verifier. rustls-pki-types.workspace = true x509-parser.workspace = true +# Per-client routing tests implement PacketIo / PacketConnection traits on in-memory mocks. +async-trait.workspace = true diff --git a/crates/aura-cli/src/config.rs b/crates/aura-cli/src/config.rs index 6dfa609..53312f6 100644 --- a/crates/aura-cli/src/config.rs +++ b/crates/aura-cli/src/config.rs @@ -11,8 +11,9 @@ //! * [`ClientConfigFile::build_route_table`] turns `[tunnel.split]` into a [`RouteTable`] (CIDR //! rules applied directly; domain rules recorded for later DNS resolution). +use std::collections::{BTreeMap, HashMap}; use std::fs; -use std::net::SocketAddr; +use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; @@ -23,6 +24,8 @@ use aura_tunnel::{RouteAction, RouteTable}; use ipnetwork::IpNetwork; use serde::Deserialize; +use crate::pool::PoolStrategy; + // ---- server.toml ---------------------------------------------------------------------------- /// Top-level `server.toml` document. @@ -42,6 +45,50 @@ pub struct ServerConfigFile { pub transport: TransportSection, } +/// `[server.pool]` section: the v2 per-client IP pool + static reservations. +/// +/// Optional for backwards compatibility. When the section is omitted the server falls back to +/// `[tunnel] pool_cidr` interpreted as a [`PoolStrategy::DynamicOnly`] pool. The server's own IP +/// (the network-address + 1) is implicit; it is reserved automatically by [`crate::pool::IpPool`]. +/// +/// Example: +/// ```toml +/// [server.pool] +/// cidr = "10.8.0.0/24" +/// strategy = "static_or_dynamic" # or "static_only" / "dynamic_only" +/// +/// [server.pool.static] +/// "phone-1" = "10.8.0.2" +/// "laptop-1" = "10.8.0.3" +/// ``` +#[derive(Debug, Clone, Default, Deserialize)] +#[serde(default)] +pub struct ServerPoolSection { + /// Optional pool CIDR; when omitted the section's existence still selects the v2 path but the + /// CIDR falls back to `[tunnel] pool_cidr`. (The two-keys-are-fine semantics keeps editing + /// a `pool_cidr`-style config painless.) + pub cidr: Option, + /// Allocation strategy: `"static_only"`, `"dynamic_only"`, or `"static_or_dynamic"`. + pub strategy: Option, + /// `client_id -> ip` reservations applied under StaticOnly / StaticOrDynamic. The map key is + /// the verified Common Name from the client's certificate; the value is an IP in `cidr`. + #[serde(rename = "static")] + pub static_map: BTreeMap, +} + +/// Fully resolved [`ServerPoolSection`] with parsed CIDR + strategy + static map. +/// +/// Built by [`ServerConfigFile::resolve_pool_config`]; fed to [`crate::pool::IpPool::new`]. +#[derive(Debug, Clone)] +pub struct ResolvedPoolConfig { + /// Pool CIDR. + pub cidr: IpNetwork, + /// Allocation strategy. + pub strategy: PoolStrategy, + /// Parsed `client_id -> ip` static reservations. + pub static_map: HashMap, +} + /// `[server]` section. #[derive(Debug, Clone, Deserialize)] pub struct ServerSection { @@ -53,6 +100,10 @@ pub struct ServerSection { /// Number of accept workers (advisory in v1). #[serde(default = "default_workers")] pub workers: usize, + /// `[server.pool]` sub-section: v2 per-client IP pool. Omitting it triggers a v1-compatible + /// fallback that interprets `[tunnel] pool_cidr` as a [`PoolStrategy::DynamicOnly`] pool. + #[serde(default)] + pub pool: ServerPoolSection, } /// `[tunnel]` section of `server.toml`. @@ -388,6 +439,71 @@ impl ServerConfigFile { .with_context(|| format!("invalid [tunnel] pool_cidr '{}'", self.tunnel.pool_cidr)) } + /// Resolve the v2 `[server.pool]` configuration with v1 fallback. + /// + /// Resolution order: + /// + /// 1. If `[server.pool]` is non-empty (any of `cidr`, `strategy`, or a static entry), use it. + /// The `cidr` defaults to `[tunnel] pool_cidr` if unset; the `strategy` defaults to + /// [`PoolStrategy::StaticOrDynamic`]. + /// 2. Otherwise, fall back to `[tunnel] pool_cidr` as a [`PoolStrategy::DynamicOnly`] pool + /// with no static reservations. This is the v1-compatible path so old configs still work. + /// + /// Errors are returned as readable strings on bad CIDRs / strategies / static-IP parses. + pub fn resolve_pool_config(&self) -> anyhow::Result { + let section = &self.server.pool; + let section_is_empty = + section.cidr.is_none() && section.strategy.is_none() && section.static_map.is_empty(); + + // Pick the CIDR: [server.pool] cidr wins, then [tunnel] pool_cidr. + let cidr_str = section + .cidr + .as_deref() + .unwrap_or(self.tunnel.pool_cidr.as_str()); + if cidr_str.is_empty() { + return Err(anyhow!( + "neither [server.pool] cidr nor [tunnel] pool_cidr is set — \ + the server needs an address pool to allocate per-client IPs" + )); + } + let cidr: IpNetwork = cidr_str + .parse() + .with_context(|| format!("invalid pool cidr '{cidr_str}'"))?; + + // Pick the strategy. When the section is wholly absent, fall back to DynamicOnly so + // old [tunnel] pool_cidr-only configs keep working without per-client static pinning. + let strategy = if section_is_empty { + PoolStrategy::DynamicOnly + } else { + match section.strategy.as_deref().unwrap_or("static_or_dynamic") { + "static_only" => PoolStrategy::StaticOnly, + "dynamic_only" => PoolStrategy::DynamicOnly, + "static_or_dynamic" => PoolStrategy::StaticOrDynamic, + other => { + return Err(anyhow!( + "invalid [server.pool] strategy '{other}' \ + (expected 'static_only' | 'dynamic_only' | 'static_or_dynamic')" + )); + } + } + }; + + // Parse the static map. + let mut static_map: HashMap = HashMap::new(); + for (cid, ip_str) in §ion.static_map { + let ip: IpAddr = ip_str + .parse() + .with_context(|| format!("invalid IP '{ip_str}' for static reservation '{cid}'"))?; + static_map.insert(cid.clone(), ip); + } + + Ok(ResolvedPoolConfig { + cidr, + strategy, + static_map, + }) + } + /// Read the `[pki]` PEM files and build an [`aura_proto::ServerConfig`]. pub fn to_proto(&self) -> anyhow::Result { Ok(aura_proto::ServerConfig { @@ -860,6 +976,159 @@ local_ip = "10.7.0.2" assert_eq!(dial.endpoints.quic.unwrap().to_string(), "1.2.3.4:444"); } + /// `[server.pool]` is parsed in full (cidr + strategy + static reservations) and + /// `resolve_pool_config` builds a usable `ResolvedPoolConfig`. + #[test] + fn parses_full_server_pool_section() { + let s = r#" +[server] +name = "edge" +[server.pool] +cidr = "10.8.0.0/24" +strategy = "static_or_dynamic" +[server.pool.static] +"phone-1" = "10.8.0.20" +"laptop-1" = "10.8.0.21" +[pki] +ca_cert = "a" +cert = "b" +key = "c" +[tunnel] +pool_cidr = "10.7.0.0/24" +"#; + let cfg = ServerConfigFile::parse(s).expect("parse"); + // Raw section state. + assert_eq!(cfg.server.pool.cidr.as_deref(), Some("10.8.0.0/24")); + assert_eq!( + cfg.server.pool.strategy.as_deref(), + Some("static_or_dynamic") + ); + assert_eq!(cfg.server.pool.static_map.len(), 2); + assert_eq!( + cfg.server + .pool + .static_map + .get("phone-1") + .map(String::as_str), + Some("10.8.0.20") + ); + + // Resolved view honours [server.pool] over [tunnel] pool_cidr. + let resolved = cfg.resolve_pool_config().expect("resolve"); + assert_eq!(resolved.cidr.to_string(), "10.8.0.0/24"); + assert_eq!(resolved.strategy, PoolStrategy::StaticOrDynamic); + assert_eq!(resolved.static_map.len(), 2); + assert_eq!( + resolved.static_map.get("laptop-1").copied(), + Some("10.8.0.21".parse::().unwrap()) + ); + } + + /// `[server.pool]` strategies parse: static_only / dynamic_only / static_or_dynamic. + #[test] + fn parses_pool_strategies() { + for (raw, expected) in [ + ("static_only", PoolStrategy::StaticOnly), + ("dynamic_only", PoolStrategy::DynamicOnly), + ("static_or_dynamic", PoolStrategy::StaticOrDynamic), + ] { + let s = format!( + r#" +[server] +name = "edge" +[server.pool] +cidr = "10.8.0.0/24" +strategy = "{raw}" +[pki] +ca_cert = "a" +cert = "b" +key = "c" +[tunnel] +pool_cidr = "10.7.0.0/24" +"# + ); + let cfg = ServerConfigFile::parse(&s).expect("parse"); + let resolved = cfg.resolve_pool_config().expect("resolve"); + assert_eq!(resolved.strategy, expected, "strategy {raw}"); + } + } + + /// An unknown strategy string is a hard error with a readable message. + #[test] + fn rejects_unknown_pool_strategy() { + let s = r#" +[server] +name = "edge" +[server.pool] +cidr = "10.8.0.0/24" +strategy = "nonsense" +[pki] +ca_cert = "a" +cert = "b" +key = "c" +[tunnel] +pool_cidr = "10.7.0.0/24" +"#; + let cfg = ServerConfigFile::parse(s).expect("parse"); + let err = cfg.resolve_pool_config().unwrap_err().to_string(); + assert!(err.contains("strategy"), "{err}"); + assert!(err.contains("nonsense"), "{err}"); + } + + /// Backwards compat: an old server.toml without `[server.pool]` resolves to + /// dynamic_only over `[tunnel] pool_cidr` — the v1-compatible fallback. + #[test] + fn pool_fallback_when_section_omitted() { + let s = r#" +[server] +name = "edge" +[pki] +ca_cert = "a" +cert = "b" +key = "c" +[tunnel] +pool_cidr = "10.7.0.0/24" +"#; + let cfg = ServerConfigFile::parse(s).expect("parse minimal v1 server.toml"); + // No [server.pool] at all. + assert!(cfg.server.pool.cidr.is_none()); + assert!(cfg.server.pool.strategy.is_none()); + assert!(cfg.server.pool.static_map.is_empty()); + + let resolved = cfg.resolve_pool_config().expect("v1 fallback resolves"); + assert_eq!(resolved.cidr.to_string(), "10.7.0.0/24"); + assert_eq!( + resolved.strategy, + PoolStrategy::DynamicOnly, + "fallback strategy is dynamic_only" + ); + assert!(resolved.static_map.is_empty()); + } + + /// `[server.pool]` without `cidr` reuses `[tunnel] pool_cidr`. Strategy still defaults to + /// static_or_dynamic when only the section header is present (i.e. some pool field exists, + /// e.g. a static reservation). + #[test] + fn pool_cidr_defaults_to_tunnel_pool_cidr_when_section_partial() { + let s = r#" +[server] +name = "edge" +[server.pool.static] +"phone-1" = "10.7.0.20" +[pki] +ca_cert = "a" +cert = "b" +key = "c" +[tunnel] +pool_cidr = "10.7.0.0/24" +"#; + let cfg = ServerConfigFile::parse(s).expect("parse"); + let resolved = cfg.resolve_pool_config().expect("resolve"); + assert_eq!(resolved.cidr.to_string(), "10.7.0.0/24"); + assert_eq!(resolved.strategy, PoolStrategy::StaticOrDynamic); + assert_eq!(resolved.static_map.len(), 1); + } + /// UDP and QUIC share the UDP socket layer; configuring the same port for both must be rejected. #[test] fn rejects_udp_quic_port_collision() { diff --git a/crates/aura-cli/src/lib.rs b/crates/aura-cli/src/lib.rs index 7c9b48e..e547b64 100644 --- a/crates/aura-cli/src/lib.rs +++ b/crates/aura-cli/src/lib.rs @@ -18,4 +18,6 @@ pub mod client; pub mod config; pub mod masks; pub mod pki; +pub mod pool; pub mod server; +pub mod server_router; diff --git a/crates/aura-cli/src/pool.rs b/crates/aura-cli/src/pool.rs new file mode 100644 index 0000000..71e9016 --- /dev/null +++ b/crates/aura-cli/src/pool.rs @@ -0,0 +1,430 @@ +//! Server-side **IP address pool** for v2 multi-client routing. +//! +//! In v1 every accepted client was bridged onto the same shared TUN, which only worked correctly +//! for one active client at a time. v2 lifts that limit: the server hands every authenticated +//! client its own IP address (per the [`PoolStrategy`] policy) and keeps an `IpAddr -> +//! PacketConnection` map so the data path can route TUN reads to the right client. +//! +//! ## Allocation policy +//! +//! The [`PoolStrategy`] field controls how an incoming `client_id` (the verified Common Name from +//! the peer's certificate) is mapped to an IP: +//! +//! * [`PoolStrategy::StaticOnly`]: only IDs present in the `static_map` are admitted; an unknown +//! ID is refused (`assign` returns `None`). +//! * [`PoolStrategy::DynamicOnly`]: the static map is ignored; every ID gets the next free address +//! from the pool. +//! * [`PoolStrategy::StaticOrDynamic`] (default): if the ID is statically reserved, hand back its +//! reservation; otherwise fall back to dynamic allocation. +//! +//! ## Reserved addresses +//! +//! [`IpPool::new`] excludes the network and (for IPv4) broadcast addresses, the configured +//! `server_ip` (the server's own TUN address), and every IP appearing as a value in `static_map`. +//! Dynamic allocation skips those plus anything currently in use. +//! +//! ## Cleanup +//! +//! When a client's per-connection task ends (the connection dropped or errored), the server calls +//! [`IpPool::release`] to return the IP to the pool. The static reservation rules still apply on +//! the next handshake from that client. + +use std::collections::{HashMap, HashSet}; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use anyhow::{anyhow, Result}; +use ipnetwork::IpNetwork; +use tokio::sync::Mutex; + +/// Allocation policy used by [`IpPool::assign`]. +/// +/// See the [module docs](crate::pool) for what each variant means. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum PoolStrategy { + /// Only clients listed in the static map are admitted. + StaticOnly, + /// Static map is ignored; every client gets a dynamic address. + DynamicOnly, + /// Static reservation wins if present; otherwise allocate dynamically. + StaticOrDynamic, +} + +impl Default for PoolStrategy { + fn default() -> Self { + Self::StaticOrDynamic + } +} + +/// The server-side IP pool: assigns and releases per-client tunnel IPs. +/// +/// Safe to share across tasks: the live in-use set lives behind a [`tokio::sync::Mutex`]. The +/// critical sections are trivial (HashSet membership + insert/remove), so the mutex is plenty for +/// the connection-rate workloads this is sized for. +#[derive(Debug)] +pub struct IpPool { + /// Address pool, e.g. `10.8.0.0/24`. + cidr: IpNetwork, + /// How to resolve a client id to an address. + strategy: PoolStrategy, + /// `client_id` -> reserved IP (used by Static/StaticOrDynamic strategies). + static_map: HashMap, + /// Addresses currently handed out. Both static and dynamic assignments live here so a release + /// puts the address back into the eligible pool regardless of how it was acquired. + in_use: Mutex>, + /// Addresses never eligible for dynamic allocation: the network/broadcast, the server's own + /// IP, and every IP claimed by a static reservation. + reserved: HashSet, +} + +impl IpPool { + /// Build a pool over `cidr` with the given `strategy` and `static_map`, reserving the server's + /// own `server_ip`. + /// + /// # Errors + /// Returns an error if `server_ip` is not contained in `cidr`, or if any static reservation + /// falls outside `cidr` or collides with another reservation. + pub fn new( + cidr: IpNetwork, + strategy: PoolStrategy, + static_map: HashMap, + server_ip: IpAddr, + ) -> Result { + if !cidr.contains(server_ip) { + return Err(anyhow!( + "server_ip {server_ip} is not contained in pool cidr {cidr}" + )); + } + + // Reserve the network address, the server's own IP, and (for IPv4) the broadcast address. + let mut reserved: HashSet = HashSet::new(); + reserved.insert(cidr.network()); + if let IpNetwork::V4(v4) = cidr { + reserved.insert(IpAddr::V4(v4.broadcast())); + } + reserved.insert(server_ip); + + // Validate every static reservation and add the assigned IPs to `reserved` so dynamic + // allocation never collides with one (even when the static client isn't connected yet). + let mut seen: HashSet = HashSet::new(); + for (cid, ip) in &static_map { + if !cidr.contains(*ip) { + return Err(anyhow!( + "static reservation for '{cid}' -> {ip} is outside pool cidr {cidr}" + )); + } + if *ip == server_ip { + return Err(anyhow!( + "static reservation for '{cid}' -> {ip} collides with server's own ip {server_ip}" + )); + } + if !seen.insert(*ip) { + return Err(anyhow!( + "static reservation for '{cid}' -> {ip} collides with another static reservation" + )); + } + reserved.insert(*ip); + } + + Ok(Self { + cidr, + strategy, + static_map, + in_use: Mutex::new(HashSet::new()), + reserved, + }) + } + + /// The address pool this allocator manages. + pub fn cidr(&self) -> IpNetwork { + self.cidr + } + + /// The active allocation strategy. + pub fn strategy(&self) -> PoolStrategy { + self.strategy + } + + /// Assign an IP to a connecting client identified by `client_id`. + /// + /// Returns `None` if the policy refuses the client (`StaticOnly` and unknown id; a static + /// reservation is already in use; pool exhausted on dynamic allocation). + pub async fn assign(&self, client_id: &str) -> Option { + let mut in_use = self.in_use.lock().await; + // Static-or-Dynamic + Static-only: try the static map first. + if matches!( + self.strategy, + PoolStrategy::StaticOnly | PoolStrategy::StaticOrDynamic + ) { + if let Some(ip) = self.static_map.get(client_id).copied() { + if in_use.contains(&ip) { + // Refuse rather than serve duplicates: another live session is holding the + // static reservation. The caller logs the refusal. + return None; + } + in_use.insert(ip); + return Some(ip); + } + if matches!(self.strategy, PoolStrategy::StaticOnly) { + return None; // unknown id under a strict-static policy + } + } + // Dynamic: scan the CIDR for the first eligible free address. + for candidate in iter_pool_ips(self.cidr) { + if self.reserved.contains(&candidate) { + continue; + } + if in_use.contains(&candidate) { + continue; + } + in_use.insert(candidate); + return Some(candidate); + } + None + } + + /// Return `ip` to the pool. Idempotent: releasing an IP not currently in use is a no-op. + pub async fn release(&self, ip: IpAddr) { + let mut in_use = self.in_use.lock().await; + in_use.remove(&ip); + } + + /// Snapshot of currently allocated addresses (mainly for tests / observability). + pub async fn in_use_snapshot(&self) -> HashSet { + self.in_use.lock().await.clone() + } +} + +/// Iterate over every address contained in `cidr` (network + hosts + broadcast). The caller is +/// responsible for filtering out reserved entries via [`IpPool::new`]'s `reserved` set. +fn iter_pool_ips(cidr: IpNetwork) -> Box> { + match cidr { + IpNetwork::V4(v4) => { + let start = u32::from(v4.network()); + let end = u32::from(v4.broadcast()); + Box::new((start..=end).map(|x| IpAddr::V4(Ipv4Addr::from(x)))) + } + IpNetwork::V6(v6) => { + // For IPv6 we don't have a tidy broadcast, so we iterate every address in the prefix. + // Pools in practice are tiny /120s or smaller; the dynamic scan stops at the first + // unused candidate, so even a huge prefix is fine — we just may not enumerate all of it. + let base = u128::from(v6.network()); + let prefix = v6.prefix(); + let count: u128 = if prefix == 0 { + u128::MAX + } else { + 1u128 << (128 - u32::from(prefix)) + }; + Box::new( + (0..count).map(move |offset| IpAddr::V6(Ipv6Addr::from(base.wrapping_add(offset)))), + ) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn ip(s: &str) -> IpAddr { + s.parse().unwrap() + } + + fn net(s: &str) -> IpNetwork { + s.parse().unwrap() + } + + #[tokio::test] + async fn dynamic_skips_reserved_network_broadcast_and_server() { + // /29 -> 10.0.0.0..10.0.0.7; server = .1; usable dynamic = .2..=.6. + let pool = IpPool::new( + net("10.0.0.0/29"), + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.0.0.1"), + ) + .unwrap(); + + let assigned: Vec = { + let mut acc = Vec::new(); + for cid in ["a", "b", "c", "d", "e"] { + acc.push(pool.assign(cid).await.expect("should get an ip")); + } + acc + }; + // We got 5 distinct addresses, none equal to network/server/broadcast. + assert_eq!(assigned.len(), 5); + let set: HashSet<_> = assigned.iter().copied().collect(); + assert_eq!(set.len(), 5); + for forbidden in ["10.0.0.0", "10.0.0.1", "10.0.0.7"] { + assert!( + !set.contains(&ip(forbidden)), + "{forbidden} must not be handed out" + ); + } + // One more client must exhaust the pool. + assert!(pool.assign("f").await.is_none(), "pool should be exhausted"); + } + + #[tokio::test] + async fn static_wins_over_dynamic() { + // .5 is a static reservation for "alice". /29 has usable hosts .2..=.6 (5 addresses); + // .5 is reserved for static, leaving 4 dynamic slots: .2 .3 .4 .6. + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("10.0.0.5")); + let pool = IpPool::new( + net("10.0.0.0/29"), + PoolStrategy::StaticOrDynamic, + statics, + ip("10.0.0.1"), + ) + .unwrap(); + + // alice always gets her reservation. + assert_eq!(pool.assign("alice").await, Some(ip("10.0.0.5"))); + // 4 dynamic clients consume the remaining usable .2/.3/.4/.6 — never alice's .5. + for cid in ["bob", "carol", "dan", "ed"] { + let got = pool.assign(cid).await.expect("dynamic ip"); + assert_ne!(got, ip("10.0.0.5"), "dynamic must not collide with static"); + } + // The pool is now exhausted (alice + 4 dynamic = 5 usable addresses). + assert!( + pool.assign("eve").await.is_none(), + "pool should be exhausted" + ); + } + + #[tokio::test] + async fn static_only_refuses_unknown_ids() { + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("10.0.0.5")); + let pool = IpPool::new( + net("10.0.0.0/29"), + PoolStrategy::StaticOnly, + statics, + ip("10.0.0.1"), + ) + .unwrap(); + + assert_eq!(pool.assign("alice").await, Some(ip("10.0.0.5"))); + // Unknown id under StaticOnly: refuse. + assert!(pool.assign("bob").await.is_none()); + } + + #[tokio::test] + async fn release_returns_ip_to_pool() { + let pool = IpPool::new( + net("10.0.0.0/30"), // .0 net, .1 server, .2 usable, .3 broadcast + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.0.0.1"), + ) + .unwrap(); + + let first = pool.assign("a").await.expect("first ip"); + assert_eq!(first, ip("10.0.0.2")); + // The pool now has nothing else free. + assert!(pool.assign("b").await.is_none()); + // After release, the only usable address is back. + pool.release(first).await; + assert_eq!(pool.assign("b").await, Some(ip("10.0.0.2"))); + } + + #[tokio::test] + async fn dynamic_only_ignores_static_map() { + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("10.0.0.5")); + let pool = IpPool::new( + net("10.0.0.0/29"), + PoolStrategy::DynamicOnly, + statics, + ip("10.0.0.1"), + ) + .unwrap(); + + // Even alice gets a dynamic IP — and crucially, never .5 (it is still reserved so dynamic + // allocations don't accidentally hand the static slot to someone else if the policy is + // flipped at runtime). + let alice_ip = pool.assign("alice").await.expect("dynamic ip"); + assert_ne!(alice_ip, ip("10.0.0.5")); + } + + #[test] + fn rejects_server_ip_outside_cidr() { + let err = IpPool::new( + net("10.0.0.0/24"), + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("192.0.2.1"), + ) + .unwrap_err(); + assert!(err.to_string().contains("not contained")); + } + + #[test] + fn rejects_static_outside_cidr() { + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("192.0.2.1")); + let err = IpPool::new( + net("10.0.0.0/24"), + PoolStrategy::StaticOrDynamic, + statics, + ip("10.0.0.1"), + ) + .unwrap_err(); + assert!(err.to_string().contains("outside pool cidr")); + } + + #[test] + fn rejects_duplicate_static_reservation() { + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("10.0.0.5")); + statics.insert("bob".to_string(), ip("10.0.0.5")); + let err = IpPool::new( + net("10.0.0.0/24"), + PoolStrategy::StaticOrDynamic, + statics, + ip("10.0.0.1"), + ) + .unwrap_err(); + assert!(err.to_string().contains("collides with another")); + } + + #[test] + fn rejects_static_colliding_with_server_ip() { + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("10.0.0.1")); + let err = IpPool::new( + net("10.0.0.0/24"), + PoolStrategy::StaticOrDynamic, + statics, + ip("10.0.0.1"), + ) + .unwrap_err(); + assert!(err.to_string().contains("collides")); + } + + #[tokio::test] + async fn static_reservation_refused_when_already_in_use() { + let mut statics = HashMap::new(); + statics.insert("alice".to_string(), ip("10.0.0.5")); + let pool = IpPool::new( + net("10.0.0.0/24"), + PoolStrategy::StaticOrDynamic, + statics, + ip("10.0.0.1"), + ) + .unwrap(); + assert_eq!(pool.assign("alice").await, Some(ip("10.0.0.5"))); + // A second handshake from the same id while the first is still live is refused (the v1 + // policy: do not hand out the same IP twice; the caller logs a warning and drops the conn). + assert!(pool.assign("alice").await.is_none()); + // After release, the second handshake succeeds. + pool.release(ip("10.0.0.5")).await; + assert_eq!(pool.assign("alice").await, Some(ip("10.0.0.5"))); + } + + #[tokio::test] + async fn default_strategy_is_static_or_dynamic() { + assert_eq!(PoolStrategy::default(), PoolStrategy::StaticOrDynamic); + } +} diff --git a/crates/aura-cli/src/server.rs b/crates/aura-cli/src/server.rs index 09bc5d3..3918ff5 100644 --- a/crates/aura-cli/src/server.rs +++ b/crates/aura-cli/src/server.rs @@ -1,23 +1,30 @@ //! `aura server`: bind a [`MultiServer`], accept connections on every enabled transport, and pump -//! packets to a server-side TUN. +//! packets to a shared server-side TUN via the per-client [`ServerRouter`]. //! -//! ## v1 data path -//! 1. Load `server.toml`, read the `[pki]` PEM files, build [`aura_proto::ServerConfig`]. -//! 2. [`MultiServer::bind`] on the `[transport]` endpoints (UDP / TCP / QUIC, per `order`), reusing -//! the listen IP from `[server] listen`. The QUIC outer (mimicry) cert reuses the Aura server leaf. +//! ## v2 data path +//! 1. Load `server.toml`, read the `[pki]` PEM files, build [`aura_proto::ServerConfig`], and +//! resolve the v2 `[server.pool]` (with v1 fallback to `[tunnel] pool_cidr` as a +//! dynamic-only pool). +//! 2. [`MultiServer::bind`] on the `[transport]` endpoints (UDP / TCP / QUIC, per `order`), +//! reusing the listen IP from `[server] listen`. The QUIC outer (mimicry) cert reuses the +//! Aura server leaf. //! 3. Start the admin IPC listener over a shared (empty) [`RouteTable`] + [`Stats`]. -//! 4. Accept loop: for each [`aura_transport::Accepted`] connection (regardless of which transport -//! carried it), create a server-side TUN on `[tunnel] pool_cidr` (the network's first host -//! address) and run [`AuraRouter`] to bridge the connection and the TUN. +//! 4. Create **one** server-side TUN at the network's first host address and spawn the +//! [`ServerRouter`]: a single owner of the TUN that demultiplexes outbound packets by +//! destination IP to the matching client connection, and forwards inbound packets from every +//! accepted client onto the TUN. +//! 5. Accept loop: for each [`aura_transport::Accepted`] connection, look up the verified +//! `peer_id` (the certificate CN), call [`IpPool::assign`] to get a tunnel IP, register the +//! connection under that IP, and spawn a per-conn inbound forwarder. When the per-conn task +//! ends, the forwarder releases the IP and unregisters the client. //! //! ## Privilege / scope notes (NOT auto-tested) -//! * Creating the TUN ([`AuraTun::create`]) needs **root** — the accept loop's data path is -//! therefore exercised only in a live, privileged run, not in unit tests. -//! * Binding the transport sockets on the configured ports (e.g. `:443`) typically needs privileges. -//! * Multi-client IP-pool allocation / NAT is **out of v1 scope**: v1 bridges to one shared TUN, so -//! it is correct for a single active client. The accept loop still accepts many connections (each -//! gets its own router task), which is enough to demonstrate the end-to-end path. (The custom-UDP -//! backend is single-peer-per-accept in v1; prefer TCP/QUIC for many concurrent clients.) +//! * Creating the TUN ([`AuraTun::create`]) needs **root** — the live data path is exercised in +//! a privileged run, not in unit tests. The [`ServerRouter`] is generic over +//! [`aura_tunnel::PacketIo`], and the tests in `server_router.rs` drive it with a `MockTun` to +//! cover the dispatch/inbound logic without root. +//! * Binding the transport sockets on the configured ports (e.g. `:443`) typically needs +//! privileges. use std::path::Path; use std::sync::Arc; @@ -25,20 +32,42 @@ use std::time::Duration; use anyhow::Context; use aura_transport::MultiServer; -use aura_tunnel::{AuraRouter, AuraTun, RouteAction, RouteTable}; +use aura_tunnel::{AuraTun, RouteAction, RouteTable}; use ipnetwork::IpNetwork; use tokio::sync::RwLock; use crate::admin::{self, AdminState, Stats}; use crate::config::ServerConfigFile; use crate::masks::MaskRotator; +use crate::pool::IpPool; +use crate::server_router::ServerRouter; /// Entry point for `aura server --config ` (and optional `--admin-socket`). pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> { let cfg = ServerConfigFile::load(config_path)?; let listen = cfg.listen_addr()?; let proto_cfg = cfg.to_proto()?; - let pool = cfg.pool_network()?; + + // Resolve the v2 [server.pool] (with v1 fallback to [tunnel] pool_cidr as a dynamic-only pool). + let resolved_pool = cfg + .resolve_pool_config() + .context("resolving [server.pool] / [tunnel] pool_cidr")?; + let server_tun_ip = first_host(resolved_pool.cidr); + let prefix = resolved_pool.cidr.prefix(); + + let pool = Arc::new(IpPool::new( + resolved_pool.cidr, + resolved_pool.strategy, + resolved_pool.static_map.clone(), + server_tun_ip, + )?); + tracing::info!( + cidr = %resolved_pool.cidr, + strategy = ?resolved_pool.strategy, + server_tun_ip = %server_tun_ip, + static_reservations = resolved_pool.static_map.len(), + "ip pool ready" + ); // Per-transport endpoints (UDP/TCP/QUIC) derived from the listen IP + `[transport]` ports. let endpoints = cfg.transport_endpoints()?; @@ -71,7 +100,7 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> { tracing::info!( name = %cfg.server.name, %listen, - pool = %cfg.tunnel.pool_cidr, + pool = %resolved_pool.cidr, workers = cfg.server.workers, dns = ?cfg.tunnel.dns, mimicry_sni = ?cfg.mimicry.sni, @@ -131,7 +160,7 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> { }); } - // Shared routing table (server-side classification is trivial in v1: everything via VPN) + + // Shared routing table (server-side classification is trivial in v2: everything via VPN) + // stats, exposed over the admin socket. let routes = Arc::new(RwLock::new(RouteTable::new(RouteAction::Vpn))); let stats = Arc::new(Stats::new()); @@ -148,45 +177,90 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> { } }); - // Accept loop. Each accepted connection (from any transport) gets a server-side TUN and a router - // task. `MultiServer::accept` yields `None` only when every transport's accept loop has stopped. + // Create the one shared server-side TUN and start the per-client router. The TUN owner runs + // in its own task; the accept-loop only registers connections and spawns per-conn forwarders. let mtu = cfg.tunnel.mtu; + let tun = AuraTun::create("aura-srv0", server_tun_ip, prefix, mtu) + .await + .context("failed to create server TUN (needs root)")?; + let router = ServerRouter::new(tun, Arc::clone(&pool)); + let server_routes = router.routes(); + let inbound_tx = router.inbound_sender(); + let router_task = tokio::spawn(async move { + if let Err(e) = router.run().await { + tracing::error!(error = %e, "server router exited"); + } + }); + + // Accept loop. Each accepted connection (from any transport) is assigned an IP from the pool + // and registered with the [`ServerRouter`]; a per-conn task forwards inbound packets into the + // shared TUN. `MultiServer::accept` yields `None` only when every transport's accept loop has + // stopped. loop { let next = { let mut srv = server.lock().await; srv.accept().await }; let Some(accepted) = next else { break }; - let peer = accepted.peer_id.clone(); + let peer_id = accepted.peer_id.clone(); let mode = accepted.mode; let conn = accepted.conn; - stats.set_peer_id(peer.clone()); - tracing::info!(peer = ?peer, %mode, "accepted authenticated client"); - let routes = Arc::clone(&routes); - let tun_ip = first_host(pool); - let prefix = pool.prefix(); - tokio::spawn(async move { - let tun = match AuraTun::create("aura-srv0", tun_ip, prefix, mtu).await { - Ok(t) => t, - Err(e) => { - tracing::error!(error = %e, "failed to create server TUN (needs root)"); - return; - } - }; - let router = AuraRouter::new(tun, routes, conn); - if let Err(e) = router.run().await { - tracing::warn!(peer = ?peer, %mode, error = %e, "server router stopped"); + // Pick the client id used for static-pool lookup. The certificate CN is the only + // identity we can trust here; if absent (defensive — every authenticated connection has + // one in practice) fall back to a unique-per-instance marker so dynamic allocation still + // runs but static-only refuses. + let client_id = peer_id.clone().unwrap_or_else(|| "unknown".to_string()); + let assigned_ip = match pool.assign(&client_id).await { + Some(ip) => ip, + None => { + tracing::warn!( + peer = ?peer_id, %mode, strategy = ?pool.strategy(), + "refusing connection: ip pool denied an address (unknown id under static_only, \ + duplicate static reservation, or pool exhausted)" + ); + // Dropping `conn` here is best-effort: the trait object will be released, and + // the underlying transport's close path tears the session down. + drop(conn); + continue; } - }); + }; + + // Record peer + log. (Stats' single peer_id slot is v1-ish — kept for backwards-compat + // with the admin status command; the live per-IP routes are the new source of truth.) + stats.set_peer_id(peer_id.clone()); + tracing::info!( + peer = ?peer_id, %mode, %assigned_ip, + "accepted authenticated client; assigned tunnel ip" + ); + + // Register the connection and spawn its inbound forwarder. + if let Some(prev) = server_routes.register(assigned_ip, Arc::clone(&conn)).await { + tracing::warn!( + %assigned_ip, + "evicting a previously-registered connection under the same ip — \ + should not happen if the pool is consistent" + ); + // The prev `Arc` is dropped here; the transport closes its + // session on the last Arc drop. + drop(prev); + } + ServerRouter::::spawn_inbound_forwarder( + server_routes.clone(), + inbound_tx.clone(), + conn, + assigned_ip, + peer_id, + ); } tracing::warn!("all transport accept loops stopped; server exiting"); + router_task.abort(); Ok(()) } /// The first usable host address of a network (network address + 1 for IPv4; the network address -/// itself for the degenerate cases). Used as the server-side TUN's own address from `pool_cidr`. +/// itself for the degenerate cases). Used as the server-side TUN's own address from the pool. fn first_host(net: IpNetwork) -> std::net::IpAddr { match net { IpNetwork::V4(v4) => { diff --git a/crates/aura-cli/src/server_router.rs b/crates/aura-cli/src/server_router.rs new file mode 100644 index 0000000..ac2afa6 --- /dev/null +++ b/crates/aura-cli/src/server_router.rs @@ -0,0 +1,497 @@ +//! Server-side per-client routing (v2 §18): one TUN, many connections, IP-pool dispatch. +//! +//! The v1 server bridged each accepted connection onto its own short-lived TUN, which only +//! supported a single active client at a time. v2 keeps **one** server-side TUN and +//! demultiplexes traffic to/from many concurrent clients using the [`IpPool`] -allocated client +//! IP as the routing key: +//! +//! * **TUN read -> conn send** (outbound, server -> peers): one task owns `tun.read_packet()`. +//! For each packet it parses the destination IP, looks up the matching client connection in the +//! `by_ip` map, and `send_packet`s the packet on that connection. Packets for unknown +//! destinations are dropped with a trace. +//! * **conn recv -> TUN write** (inbound, peers -> server): every accepted client gets a +//! dedicated task that loops on `conn.recv_packet().await` and forwards the result into a +//! single mpsc channel; one TUN-owner task drains the channel and writes packets to the TUN. +//! +//! Sharing a single `PacketIo` between the read-loop and the write-loop is awkward because both +//! halves take `&mut self`. Just like [`aura_tunnel::AuraRouter`], the router parks the TUN +//! behind one `select!` loop that handles both directions on the same task: a TUN read race +//! against a write coming off the inbound mpsc. +//! +//! The router is generic over [`aura_tunnel::PacketIo`] so tests can substitute a `MockTun` and +//! drive both halves without root. + +use std::collections::HashMap; +use std::net::IpAddr; +use std::sync::Arc; + +use aura_proto::PacketConnection; +use aura_tunnel::router::dst_ip; +use aura_tunnel::PacketIo; +use tokio::sync::{mpsc, RwLock}; + +use crate::pool::IpPool; + +/// Bounded capacity of the per-server inbound mpsc the per-conn tasks fan into. +/// +/// Sized to absorb short bursts from many clients without making backpressure cheap. The TUN +/// owner drains continuously, so this only fills when the kernel is briefly behind on writes. +const INBOUND_CAPACITY: usize = 4096; + +/// Per-client routing state shared between the registration path (accept-loop) and the data path +/// (TUN owner / per-conn tasks). +/// +/// Cheap to clone (it is `Arc`-shaped internally) so the server's accept-loop and per-connection +/// tasks can all hold references to the same dispatch map. +#[derive(Clone)] +pub struct ServerRoutes { + /// Live `client_ip -> connection` map; the TUN owner reads, accept/disconnect mutate. + by_ip: Arc>>>, + /// IP pool the server uses to assign and release client addresses. + pool: Arc, +} + +impl ServerRoutes { + /// Build empty per-server routes over the given [`IpPool`]. + pub fn new(pool: Arc) -> Self { + Self { + by_ip: Arc::new(RwLock::new(HashMap::new())), + pool, + } + } + + /// Borrow the [`IpPool`] for assign/release calls. + pub fn pool(&self) -> &Arc { + &self.pool + } + + /// Register a connection under `ip`. Returns the previously registered connection (if any), + /// which the caller should treat as a forced-disconnect race. + pub async fn register( + &self, + ip: IpAddr, + conn: Arc, + ) -> Option> { + self.by_ip.write().await.insert(ip, conn) + } + + /// Remove the connection registered under `ip`. + pub async fn unregister(&self, ip: IpAddr) { + self.by_ip.write().await.remove(&ip); + } + + /// Snapshot the current dispatch map (mainly for tests / observability). + pub async fn snapshot_ips(&self) -> Vec { + self.by_ip.read().await.keys().copied().collect() + } + + /// Send a packet to the connection registered under `dst`. Returns `Ok(true)` if the packet + /// was queued to a connection; `Ok(false)` if no connection is registered under `dst`. Errors + /// from the connection's `send_packet` surface verbatim — the caller decides whether to + /// evict the client. + pub async fn dispatch(&self, dst: IpAddr, pkt: &[u8]) -> anyhow::Result { + let conn = { + let map = self.by_ip.read().await; + map.get(&dst).cloned() + }; + match conn { + Some(c) => { + c.send_packet(pkt).await?; + Ok(true) + } + None => Ok(false), + } + } +} + +/// The per-server "one TUN, many clients" router. +/// +/// Owns a [`PacketIo`] device and a [`ServerRoutes`] dispatch table. [`Self::run`] drains +/// outbound TUN reads into the right connection, and inbound packets from any registered +/// connection to the single TUN. +/// +/// The accept-loop drives [`Self::handle_connection`] from each `MultiServer::accept` so per-conn +/// state is created up-front and torn down when the connection ends. +pub struct ServerRouter { + tun: P, + routes: ServerRoutes, + /// All per-conn tasks fan their inbound packets into this single channel; the owner of `tun` + /// drains the receiver. + inbound_tx: mpsc::Sender>, + inbound_rx: mpsc::Receiver>, +} + +impl ServerRouter

{ + /// Build a fresh router with empty routes and the given pool. + pub fn new(tun: P, pool: Arc) -> Self { + Self::from_routes(tun, ServerRoutes::new(pool)) + } + + /// Build a router from an existing [`ServerRoutes`] (mainly for tests that pre-seed routes). + pub fn from_routes(tun: P, routes: ServerRoutes) -> Self { + let (inbound_tx, inbound_rx) = mpsc::channel::>(INBOUND_CAPACITY); + Self { + tun, + routes, + inbound_tx, + inbound_rx, + } + } + + /// Clone the shared routes (so the accept-loop can call [`Self::spawn_connection`] from + /// outside the router task). + pub fn routes(&self) -> ServerRoutes { + self.routes.clone() + } + + /// Clone the inbound sender so the accept-loop / handlers can spawn per-conn forwarder tasks + /// without holding a reference to the router. + pub fn inbound_sender(&self) -> mpsc::Sender> { + self.inbound_tx.clone() + } + + /// Spawn the per-connection inbound forwarder. + /// + /// Loops `conn.recv_packet()` and forwards each packet into the shared `inbound_tx`. When the + /// connection errors out (peer closed, transport error, etc.) the function unregisters `ip` + /// from the routes and releases it back to the pool. + pub fn spawn_inbound_forwarder( + routes: ServerRoutes, + inbound_tx: mpsc::Sender>, + conn: Arc, + ip: IpAddr, + peer_id: Option, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + loop { + match conn.recv_packet().await { + Ok(pkt) => { + if inbound_tx.send(pkt).await.is_err() { + tracing::debug!( + %ip, peer = ?peer_id, + "server router inbound channel closed; ending per-conn task" + ); + break; + } + } + Err(e) => { + tracing::info!( + %ip, peer = ?peer_id, error = %e, + "client connection ended; releasing ip and unregistering" + ); + break; + } + } + } + routes.unregister(ip).await; + routes.pool().release(ip).await; + }) + } + + /// Run the TUN owner loop: read packets from the TUN, dispatch to the right connection; + /// write packets coming in from the inbound channel to the TUN. Returns when either half + /// fails. + pub async fn run(mut self) -> anyhow::Result<()> { + loop { + tokio::select! { + read = self.tun.read_packet() => { + match read { + Ok(pkt) => { + if let Err(e) = self.route_outbound(&pkt).await { + // Connection-level errors are non-fatal for the router: log and + // keep serving other clients (the per-conn task will see its own + // error path and release the IP). + tracing::warn!(error = %e, "server router outbound send failed"); + } + } + Err(e) => { + return Err(anyhow::Error::new(e).context("server TUN read failed")); + } + } + } + maybe_pkt = self.inbound_rx.recv() => { + match maybe_pkt { + Some(pkt) => { + if let Err(e) = self.tun.write_packet(&pkt).await { + return Err(anyhow::Error::new(e).context("server TUN write failed")); + } + } + None => { + // All inbound senders dropped (the accept-loop and all per-conn + // tasks). The router has no work left; exit. + return Ok(()); + } + } + } + } + } + } + + /// Classify one outbound packet and dispatch it to the matching client connection. + async fn route_outbound(&self, pkt: &[u8]) -> anyhow::Result<()> { + let Some(dst) = dst_ip(pkt) else { + tracing::trace!(len = pkt.len(), "dropping unparseable outbound packet"); + return Ok(()); + }; + match self.routes.dispatch(dst, pkt).await? { + true => Ok(()), + false => { + tracing::trace!(%dst, len = pkt.len(), "no client registered for destination; dropping"); + Ok(()) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::HashMap; + use std::net::Ipv4Addr; + + use async_trait::async_trait; + use tokio::sync::{mpsc, Mutex as TokMutex}; + + use crate::pool::PoolStrategy; + + /// In-memory fake TUN: `read_packet` drains an injected queue, `write_packet` forwards to a + /// channel the test reads. Identical contract to the one in `aura-tunnel/tests/routes.rs`. + struct MockTun { + inbound: mpsc::Receiver>, + written: mpsc::Sender>, + } + + #[async_trait] + impl PacketIo for MockTun { + async fn read_packet(&mut self) -> std::io::Result> { + match self.inbound.recv().await { + Some(pkt) => Ok(pkt), + None => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "mock TUN closed", + )), + } + } + + async fn write_packet(&mut self, packet: &[u8]) -> std::io::Result<()> { + self.written + .send(packet.to_vec()) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "test dropped")) + } + } + + /// Mock packet connection: `send_packet` forwards to a channel; `recv_packet` drains another. + struct MockConn { + sent: mpsc::Sender>, + to_recv: TokMutex>>, + } + + #[async_trait] + impl PacketConnection for MockConn { + async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { + self.sent.send(packet.to_vec()).await?; + Ok(()) + } + async fn recv_packet(&self) -> anyhow::Result> { + let mut rx = self.to_recv.lock().await; + rx.recv() + .await + .ok_or_else(|| anyhow::anyhow!("mock conn closed")) + } + } + + fn ipv4_packet_to(dst: Ipv4Addr) -> Vec { + let mut pkt = vec![0u8; 20]; + pkt[0] = 0x45; + let o = dst.octets(); + pkt[16..20].copy_from_slice(&o); + pkt + } + + fn ip(s: &str) -> IpAddr { + s.parse().unwrap() + } + + fn net(s: &str) -> ipnetwork::IpNetwork { + s.parse().unwrap() + } + + /// End-to-end test of the server router's data path: + /// * Two MockConns are registered under .2 and .3. + /// * A TUN packet to .2 should reach conn-A; to .3 -> conn-B; to .99 -> dropped. + /// * A packet sent into conn-A's recv channel should be written verbatim to the TUN. + #[tokio::test] + async fn server_router_dispatches_by_destination_ip() { + // Build a pool with .1 reserved as the server's IP. + let pool = Arc::new( + IpPool::new( + net("10.0.0.0/29"), + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.0.0.1"), + ) + .unwrap(), + ); + + // Wire up the mock TUN and the router. + let (tun_in_tx, tun_in_rx) = mpsc::channel::>(8); + let (tun_out_tx, mut tun_out_rx) = mpsc::channel::>(8); + let tun = MockTun { + inbound: tun_in_rx, + written: tun_out_tx, + }; + let router = ServerRouter::new(tun, pool.clone()); + let routes = router.routes(); + let inbound_tx = router.inbound_sender(); + + // Allocate two clients via the pool and register them under those IPs. + let ip_a = pool.assign("client-a").await.expect("ip-a"); + let ip_b = pool.assign("client-b").await.expect("ip-b"); + + let (sent_a_tx, mut sent_a_rx) = mpsc::channel::>(8); + let (recv_a_tx, recv_a_rx) = mpsc::channel::>(8); + let conn_a: Arc = Arc::new(MockConn { + sent: sent_a_tx, + to_recv: TokMutex::new(recv_a_rx), + }); + let (sent_b_tx, mut sent_b_rx) = mpsc::channel::>(8); + let (recv_b_tx, recv_b_rx) = mpsc::channel::>(8); + let conn_b: Arc = Arc::new(MockConn { + sent: sent_b_tx, + to_recv: TokMutex::new(recv_b_rx), + }); + routes.register(ip_a, Arc::clone(&conn_a)).await; + routes.register(ip_b, Arc::clone(&conn_b)).await; + + // Spawn per-conn forwarders and the router itself. + let _fwd_a = ServerRouter::::spawn_inbound_forwarder( + routes.clone(), + inbound_tx.clone(), + Arc::clone(&conn_a), + ip_a, + Some("client-a".into()), + ); + let _fwd_b = ServerRouter::::spawn_inbound_forwarder( + routes.clone(), + inbound_tx, + Arc::clone(&conn_b), + ip_b, + Some("client-b".into()), + ); + let handle = tokio::spawn(router.run()); + + // 1. TUN read -> dispatch to conn-A by destination = ip_a. + let pkt_to_a = ipv4_packet_to(match ip_a { + IpAddr::V4(v4) => v4, + _ => panic!("ipv4 in this test"), + }); + tun_in_tx.send(pkt_to_a.clone()).await.unwrap(); + let got = tokio::time::timeout(std::time::Duration::from_secs(2), sent_a_rx.recv()) + .await + .expect("router did not dispatch to client-a in time") + .expect("client-a sent channel closed"); + assert_eq!(got, pkt_to_a, "conn-a should receive the packet verbatim"); + + // 2. TUN read -> dispatch to conn-B by destination = ip_b. + let pkt_to_b = ipv4_packet_to(match ip_b { + IpAddr::V4(v4) => v4, + _ => panic!("ipv4 in this test"), + }); + tun_in_tx.send(pkt_to_b.clone()).await.unwrap(); + let got = tokio::time::timeout(std::time::Duration::from_secs(2), sent_b_rx.recv()) + .await + .expect("router did not dispatch to client-b in time") + .expect("client-b sent channel closed"); + assert_eq!(got, pkt_to_b); + + // 3. TUN read -> destination .99 has nobody registered; must be silently dropped (no + // crash, nothing reaches either conn). + let pkt_to_unknown = ipv4_packet_to(Ipv4Addr::new(10, 0, 0, 99)); + tun_in_tx.send(pkt_to_unknown).await.unwrap(); + let res = + tokio::time::timeout(std::time::Duration::from_millis(200), sent_a_rx.recv()).await; + assert!(res.is_err(), "no packet should reach conn-a"); + let res = + tokio::time::timeout(std::time::Duration::from_millis(200), sent_b_rx.recv()).await; + assert!(res.is_err(), "no packet should reach conn-b"); + + // 4. Inbound: conn-A's recv channel -> TUN write. + let in_pkt = ipv4_packet_to(Ipv4Addr::new(8, 8, 8, 8)); + recv_a_tx.send(in_pkt.clone()).await.unwrap(); + let written = tokio::time::timeout(std::time::Duration::from_secs(2), tun_out_rx.recv()) + .await + .expect("router did not forward inbound to TUN in time") + .expect("TUN write channel closed"); + assert_eq!(written, in_pkt); + + // 5. Inbound from conn-B: TUN must also receive it (one TUN, many sources). + let in_pkt_b = ipv4_packet_to(Ipv4Addr::new(1, 1, 1, 1)); + recv_b_tx.send(in_pkt_b.clone()).await.unwrap(); + let written = tokio::time::timeout(std::time::Duration::from_secs(2), tun_out_rx.recv()) + .await + .expect("inbound-b did not reach TUN") + .expect("TUN write channel closed"); + assert_eq!(written, in_pkt_b); + + // Cleanup: closing the TUN read side bubbles an EOF up the run-loop. + drop(tun_in_tx); + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await; + } + + /// When a per-conn task ends (e.g. the connection errors), the router must release the IP + /// back to the pool and remove the dispatch entry so a re-registration can succeed. + #[tokio::test] + async fn per_conn_task_releases_ip_and_unregisters_on_error() { + let pool = Arc::new( + IpPool::new( + net("10.0.0.0/30"), // only .2 is dynamically usable + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.0.0.1"), + ) + .unwrap(), + ); + + let (_tun_in_tx, tun_in_rx) = mpsc::channel::>(8); + let (tun_out_tx, _tun_out_rx) = mpsc::channel::>(8); + let tun = MockTun { + inbound: tun_in_rx, + written: tun_out_tx, + }; + let router = ServerRouter::new(tun, pool.clone()); + let routes = router.routes(); + let inbound_tx = router.inbound_sender(); + let _handle = tokio::spawn(router.run()); + + let ip_a = pool.assign("client-a").await.expect("only usable ip"); + let (sent_tx, _sent_rx) = mpsc::channel::>(8); + let (recv_tx, recv_rx) = mpsc::channel::>(8); + let conn: Arc = Arc::new(MockConn { + sent: sent_tx, + to_recv: TokMutex::new(recv_rx), + }); + routes.register(ip_a, Arc::clone(&conn)).await; + let fwd = ServerRouter::::spawn_inbound_forwarder( + routes.clone(), + inbound_tx, + Arc::clone(&conn), + ip_a, + Some("client-a".into()), + ); + + // Drop the recv sender to make `conn.recv_packet()` return an error, which ends the task. + drop(recv_tx); + let _ = tokio::time::timeout(std::time::Duration::from_secs(2), fwd).await; + + // The IP must be releasable now (re-assigned without exhaustion). + let next = pool.assign("client-b").await.expect("ip back in pool"); + assert_eq!(next, ip_a, "released ip should be reusable"); + // And the dispatch map should not still contain a stale entry under ip_a. + let snap = routes.snapshot_ips().await; + assert!( + !snap.contains(&ip_a), + "stale entry should have been unregistered" + ); + } +} diff --git a/crates/aura-cli/tests/pool.rs b/crates/aura-cli/tests/pool.rs new file mode 100644 index 0000000..39419e8 --- /dev/null +++ b/crates/aura-cli/tests/pool.rs @@ -0,0 +1,101 @@ +//! Integration-level coverage of [`aura_cli::pool::IpPool`]. +//! +//! The unit tests inside `src/pool.rs` cover the strategy matrix in isolation. These tests +//! exercise the same surface as an external crate would — through the public re-exports — so +//! that the API contract stays usable for downstream consumers (the `aura server` runtime). + +use std::collections::HashMap; +use std::net::IpAddr; + +use aura_cli::pool::{IpPool, PoolStrategy}; +use ipnetwork::IpNetwork; + +fn ip(s: &str) -> IpAddr { + s.parse().unwrap() +} + +fn net(s: &str) -> IpNetwork { + s.parse().unwrap() +} + +/// Smoke: build a tiny /29 pool, allocate, release, re-allocate the same address. +#[tokio::test] +async fn pool_allocate_release_cycle() { + let pool = IpPool::new( + net("10.8.0.0/29"), + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.8.0.1"), + ) + .expect("pool"); + + let a = pool.assign("a").await.expect("first"); + let b = pool.assign("b").await.expect("second"); + assert_ne!(a, b, "distinct clients must get distinct addresses"); + let snap_before = pool.in_use_snapshot().await; + assert!(snap_before.contains(&a)); + assert!(snap_before.contains(&b)); + + pool.release(a).await; + let snap_after = pool.in_use_snapshot().await; + assert!(!snap_after.contains(&a)); + + // Next dynamic allocation can hand back `a`'s address. + let again = pool.assign("c").await.expect("recycled"); + assert_eq!(again, a, "released ip should be reusable"); +} + +/// `StaticOrDynamic` honours a statically reserved address and never hands it to anyone else. +#[tokio::test] +async fn pool_static_reservation_is_pinned() { + let mut statics = HashMap::new(); + statics.insert("phone-1".to_string(), ip("10.8.0.20")); + let pool = IpPool::new( + net("10.8.0.0/24"), + PoolStrategy::StaticOrDynamic, + statics, + ip("10.8.0.1"), + ) + .expect("pool"); + + assert_eq!(pool.assign("phone-1").await, Some(ip("10.8.0.20"))); + // Many dynamic allocations: none of them must collide with the static reservation. + for cid in ["a", "b", "c", "d", "e"] { + let got = pool.assign(cid).await.expect("dynamic"); + assert_ne!(got, ip("10.8.0.20"), "{cid} got the static reservation"); + } +} + +/// `StaticOnly` refuses unknown ids — the strict deployment mode. +#[tokio::test] +async fn pool_static_only_refuses_unknown() { + let mut statics = HashMap::new(); + statics.insert("phone-1".to_string(), ip("10.8.0.20")); + let pool = IpPool::new( + net("10.8.0.0/24"), + PoolStrategy::StaticOnly, + statics, + ip("10.8.0.1"), + ) + .expect("pool"); + assert!(pool.assign("phone-1").await.is_some()); + assert!( + pool.assign("randomer").await.is_none(), + "StaticOnly must refuse unknown ids" + ); +} + +/// Exhausting the pool returns `None` instead of looping forever or panicking. +#[tokio::test] +async fn pool_exhaustion_returns_none() { + // /30 -> 4 total, .0 net, .1 server, .2 usable, .3 broadcast => 1 dynamic slot. + let pool = IpPool::new( + net("10.8.0.0/30"), + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.8.0.1"), + ) + .expect("pool"); + assert_eq!(pool.assign("a").await, Some(ip("10.8.0.2"))); + assert!(pool.assign("b").await.is_none(), "pool exhausted"); +} diff --git a/crates/aura-cli/tests/server_routing.rs b/crates/aura-cli/tests/server_routing.rs new file mode 100644 index 0000000..5b194f8 --- /dev/null +++ b/crates/aura-cli/tests/server_routing.rs @@ -0,0 +1,261 @@ +//! Integration tests for the server-side per-client routing path +//! ([`aura_cli::server_router::ServerRouter`] + [`aura_cli::pool::IpPool`]). +//! +//! These tests do not bind any sockets and do not need root: they wire mock +//! [`PacketIo`](aura_tunnel::PacketIo) and [`PacketConnection`](aura_proto::PacketConnection) +//! implementations into the router and drive both directions through channels. + +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::Arc; +use std::time::Duration; + +use async_trait::async_trait; +use aura_cli::pool::{IpPool, PoolStrategy}; +use aura_cli::server_router::ServerRouter; +use aura_proto::PacketConnection; +use aura_tunnel::PacketIo; +use ipnetwork::IpNetwork; +use tokio::sync::{mpsc, Mutex}; + +// ---- in-memory mocks --------------------------------------------------------------------------- + +/// Fake TUN backed by mpsc channels; identical to the one in aura-tunnel's routes test. +struct MockTun { + inbound: mpsc::Receiver>, + written: mpsc::Sender>, +} + +#[async_trait] +impl PacketIo for MockTun { + async fn read_packet(&mut self) -> std::io::Result> { + match self.inbound.recv().await { + Some(pkt) => Ok(pkt), + None => Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "mock TUN closed", + )), + } + } + async fn write_packet(&mut self, packet: &[u8]) -> std::io::Result<()> { + self.written + .send(packet.to_vec()) + .await + .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "test dropped")) + } +} + +/// Fake encrypted connection backed by mpsc. +struct MockConn { + sent: mpsc::Sender>, + to_recv: Mutex>>, +} + +#[async_trait] +impl PacketConnection for MockConn { + async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { + self.sent.send(packet.to_vec()).await?; + Ok(()) + } + async fn recv_packet(&self) -> anyhow::Result> { + let mut rx = self.to_recv.lock().await; + rx.recv() + .await + .ok_or_else(|| anyhow::anyhow!("mock conn closed")) + } +} + +fn ipv4_packet_to(dst: Ipv4Addr) -> Vec { + let mut pkt = vec![0u8; 20]; + pkt[0] = 0x45; + let o = dst.octets(); + pkt[16..20].copy_from_slice(&o); + pkt +} + +fn ip(s: &str) -> IpAddr { + s.parse().unwrap() +} + +fn netw(s: &str) -> IpNetwork { + s.parse().unwrap() +} + +// ---- the main scenario ------------------------------------------------------------------------- + +/// Two clients, one TUN: each TUN packet must reach the right connection, and inbound packets +/// from either connection must reach the TUN. An unknown destination IP is silently dropped. +#[tokio::test] +async fn server_router_per_client_dispatch_end_to_end() { + // Pool: 10.0.0.0/29; server uses .1; .2/.3 will be handed to client-a / client-b dynamically. + let pool = Arc::new( + IpPool::new( + netw("10.0.0.0/29"), + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.0.0.1"), + ) + .expect("pool"), + ); + + // Wire the mocks. + let (tun_in_tx, tun_in_rx) = mpsc::channel::>(8); + let (tun_out_tx, mut tun_out_rx) = mpsc::channel::>(8); + let tun = MockTun { + inbound: tun_in_rx, + written: tun_out_tx, + }; + + // Build the router, capture handles to its routes + inbound channel, then spawn the run-loop. + let router = ServerRouter::new(tun, Arc::clone(&pool)); + let routes = router.routes(); + let inbound_tx = router.inbound_sender(); + let run = tokio::spawn(router.run()); + + // Allocate two clients and register their connections. + let ip_a = pool.assign("client-a").await.expect("ip-a"); + let ip_b = pool.assign("client-b").await.expect("ip-b"); + assert_ne!(ip_a, ip_b); + + let (sent_a_tx, mut sent_a_rx) = mpsc::channel::>(8); + let (recv_a_tx, recv_a_rx) = mpsc::channel::>(8); + let conn_a: Arc = Arc::new(MockConn { + sent: sent_a_tx, + to_recv: Mutex::new(recv_a_rx), + }); + let (sent_b_tx, mut sent_b_rx) = mpsc::channel::>(8); + let (recv_b_tx, recv_b_rx) = mpsc::channel::>(8); + let conn_b: Arc = Arc::new(MockConn { + sent: sent_b_tx, + to_recv: Mutex::new(recv_b_rx), + }); + routes.register(ip_a, Arc::clone(&conn_a)).await; + routes.register(ip_b, Arc::clone(&conn_b)).await; + + // Per-conn inbound forwarders push received packets onto the shared TUN. + let _fa = ServerRouter::::spawn_inbound_forwarder( + routes.clone(), + inbound_tx.clone(), + Arc::clone(&conn_a), + ip_a, + Some("client-a".into()), + ); + let _fb = ServerRouter::::spawn_inbound_forwarder( + routes.clone(), + inbound_tx, + Arc::clone(&conn_b), + ip_b, + Some("client-b".into()), + ); + + // (a) TUN -> client-a: a packet to ip_a must arrive on conn-a's send channel verbatim. + let pkt_to_a = match ip_a { + IpAddr::V4(v4) => ipv4_packet_to(v4), + _ => panic!("ipv4 expected"), + }; + tun_in_tx.send(pkt_to_a.clone()).await.unwrap(); + let got = tokio::time::timeout(Duration::from_secs(2), sent_a_rx.recv()) + .await + .expect("conn-a did not receive the packet in time") + .expect("conn-a send channel closed"); + assert_eq!(got, pkt_to_a); + + // (b) TUN -> client-b. + let pkt_to_b = match ip_b { + IpAddr::V4(v4) => ipv4_packet_to(v4), + _ => panic!("ipv4 expected"), + }; + tun_in_tx.send(pkt_to_b.clone()).await.unwrap(); + let got = tokio::time::timeout(Duration::from_secs(2), sent_b_rx.recv()) + .await + .expect("conn-b did not receive the packet in time") + .expect("conn-b send channel closed"); + assert_eq!(got, pkt_to_b); + + // (c) Unknown destination: nobody is registered for .99, so it is silently dropped. + let pkt_to_unknown = ipv4_packet_to(Ipv4Addr::new(10, 0, 0, 99)); + tun_in_tx.send(pkt_to_unknown).await.unwrap(); + // Give the router a moment, then assert nothing reached either connection. + let res = tokio::time::timeout(Duration::from_millis(200), sent_a_rx.recv()).await; + assert!(res.is_err(), "no packet should reach conn-a"); + let res = tokio::time::timeout(Duration::from_millis(200), sent_b_rx.recv()).await; + assert!(res.is_err(), "no packet should reach conn-b"); + + // (d) Inbound conn-a -> TUN. + let in_pkt_a = ipv4_packet_to(Ipv4Addr::new(8, 8, 8, 8)); + recv_a_tx.send(in_pkt_a.clone()).await.unwrap(); + let written = tokio::time::timeout(Duration::from_secs(2), tun_out_rx.recv()) + .await + .expect("router did not write inbound to TUN in time") + .expect("TUN write channel closed"); + assert_eq!(written, in_pkt_a); + + // (e) Inbound conn-b -> same TUN. + let in_pkt_b = ipv4_packet_to(Ipv4Addr::new(1, 1, 1, 1)); + recv_b_tx.send(in_pkt_b.clone()).await.unwrap(); + let written = tokio::time::timeout(Duration::from_secs(2), tun_out_rx.recv()) + .await + .expect("inbound conn-b did not reach TUN in time") + .expect("TUN write channel closed"); + assert_eq!(written, in_pkt_b); + + // Tear down: closing the TUN read side exits the run-loop. + drop(tun_in_tx); + let _ = tokio::time::timeout(Duration::from_secs(2), run).await; +} + +/// Per-conn task ending must release the IP and unregister the client. +#[tokio::test] +async fn per_conn_disconnect_releases_ip() { + let pool = Arc::new( + IpPool::new( + netw("10.0.0.0/30"), // only .2 is usable + PoolStrategy::DynamicOnly, + HashMap::new(), + ip("10.0.0.1"), + ) + .expect("pool"), + ); + let (_tun_in_tx, tun_in_rx) = mpsc::channel::>(8); + let (tun_out_tx, _tun_out_rx) = mpsc::channel::>(8); + let tun = MockTun { + inbound: tun_in_rx, + written: tun_out_tx, + }; + let router = ServerRouter::new(tun, Arc::clone(&pool)); + let routes = router.routes(); + let inbound_tx = router.inbound_sender(); + let _run = tokio::spawn(router.run()); + + let ip_a = pool.assign("alice").await.expect("the only usable ip"); + + let (sent_tx, _sent_rx) = mpsc::channel::>(8); + let (recv_tx, recv_rx) = mpsc::channel::>(8); + let conn: Arc = Arc::new(MockConn { + sent: sent_tx, + to_recv: Mutex::new(recv_rx), + }); + routes.register(ip_a, Arc::clone(&conn)).await; + let fwd = ServerRouter::::spawn_inbound_forwarder( + routes.clone(), + inbound_tx, + Arc::clone(&conn), + ip_a, + Some("alice".into()), + ); + + // Cause `recv_packet` to fail, ending the per-conn task. + drop(recv_tx); + let _ = tokio::time::timeout(Duration::from_secs(2), fwd).await; + + // The pool should be back to a fresh state. + assert!( + !routes.snapshot_ips().await.contains(&ip_a), + "stale entry must be unregistered" + ); + assert_eq!( + pool.assign("bob").await, + Some(ip_a), + "released IP must be reusable" + ); +}