//! Server-side per-client routing (v2 §18): one TUN, many connections, IP-pool dispatch. //! //! The v1 server bridged each accepted connection onto its own short-lived TUN, which only //! supported a single active client at a time. v2 keeps **one** server-side TUN and //! demultiplexes traffic to/from many concurrent clients using the [`IpPool`] -allocated client //! IP as the routing key: //! //! * **TUN read -> conn send** (outbound, server -> peers): one task owns `tun.read_packet()`. //! For each packet it parses the destination IP, looks up the matching client connection in the //! `by_ip` map, and `send_packet`s the packet on that connection. Packets for unknown //! destinations are dropped with a trace. //! * **conn recv -> TUN write** (inbound, peers -> server): every accepted client gets a //! dedicated task that loops on `conn.recv_packet().await` and forwards the result into a //! single mpsc channel; one TUN-owner task drains the channel and writes packets to the TUN. //! //! Sharing a single `PacketIo` between the read-loop and the write-loop is awkward because both //! halves take `&mut self`. Just like [`aura_tunnel::AuraRouter`], the router parks the TUN //! behind one `select!` loop that handles both directions on the same task: a TUN read race //! against a write coming off the inbound mpsc. //! //! The router is generic over [`aura_tunnel::PacketIo`] so tests can substitute a `MockTun` and //! drive both halves without root. use std::collections::HashMap; use std::net::IpAddr; use std::sync::Arc; use aura_proto::PacketConnection; use aura_tunnel::router::dst_ip; use aura_tunnel::{PacketCounters, PacketIo}; use tokio::sync::{mpsc, RwLock}; use crate::pool::IpPool; /// Bounded capacity of the per-server inbound mpsc the per-conn tasks fan into. /// /// Sized to absorb short bursts from many clients without making backpressure cheap. The TUN /// owner drains continuously, so this only fills when the kernel is briefly behind on writes. const INBOUND_CAPACITY: usize = 4096; /// Per-client routing state shared between the registration path (accept-loop) and the data path /// (TUN owner / per-conn tasks). /// /// Cheap to clone (it is `Arc`-shaped internally) so the server's accept-loop and per-connection /// tasks can all hold references to the same dispatch map. #[derive(Clone)] pub struct ServerRoutes { /// Live `client_ip -> connection` map; the TUN owner reads, accept/disconnect mutate. by_ip: Arc>>>, /// IP pool the server uses to assign and release client addresses. pool: Arc, } impl ServerRoutes { /// Build empty per-server routes over the given [`IpPool`]. pub fn new(pool: Arc) -> Self { Self { by_ip: Arc::new(RwLock::new(HashMap::new())), pool, } } /// Borrow the [`IpPool`] for assign/release calls. pub fn pool(&self) -> &Arc { &self.pool } /// Register a connection under `ip`. Returns the previously registered connection (if any), /// which the caller should treat as a forced-disconnect race. pub async fn register( &self, ip: IpAddr, conn: Arc, ) -> Option> { self.by_ip.write().await.insert(ip, conn) } /// Remove the connection registered under `ip`. pub async fn unregister(&self, ip: IpAddr) { self.by_ip.write().await.remove(&ip); } /// Snapshot the current dispatch map (mainly for tests / observability). pub async fn snapshot_ips(&self) -> Vec { self.by_ip.read().await.keys().copied().collect() } /// Send a packet to the connection registered under `dst`. Returns `Ok(true)` if the packet /// was queued to a connection; `Ok(false)` if no connection is registered under `dst`. Errors /// from the connection's `send_packet` surface verbatim — the caller decides whether to /// evict the client. pub async fn dispatch(&self, dst: IpAddr, pkt: &[u8]) -> anyhow::Result { let conn = { let map = self.by_ip.read().await; map.get(&dst).cloned() }; match conn { Some(c) => { c.send_packet(pkt).await?; Ok(true) } None => Ok(false), } } } /// The per-server "one TUN, many clients" router. /// /// Owns a [`PacketIo`] device and a [`ServerRoutes`] dispatch table. [`Self::run`] drains /// outbound TUN reads into the right connection, and inbound packets from any registered /// connection to the single TUN. /// /// The accept-loop drives [`Self::handle_connection`] from each `MultiServer::accept` so per-conn /// state is created up-front and torn down when the connection ends. pub struct ServerRouter { tun: P, routes: ServerRoutes, /// All per-conn tasks fan their inbound packets into this single channel; the owner of `tun` /// drains the receiver. inbound_tx: mpsc::Sender>, inbound_rx: mpsc::Receiver>, /// Optional packet counters bumped on every server-side TUN tx/rx. Tx counts packets the /// server read from its own TUN and dispatched to a client; rx counts packets a client sent /// that were successfully written back to the TUN. Wired to the admin `Stats` so `aura status` /// reports live numbers. `None` skips the atomic ops entirely. counters: Option, } impl ServerRouter

{ /// Build a fresh router with empty routes and the given pool. /// /// No stats are recorded. Use [`Self::with_stats`] if `aura status` should see live counters. pub fn new(tun: P, pool: Arc) -> Self { Self::from_routes(tun, ServerRoutes::new(pool)) } /// Like [`Self::new`] but also wires in [`PacketCounters`] for the admin socket. pub fn with_stats(tun: P, pool: Arc, counters: Option) -> Self { Self::from_routes_with_stats(tun, ServerRoutes::new(pool), counters) } /// Build a router from an existing [`ServerRoutes`] (mainly for tests that pre-seed routes). pub fn from_routes(tun: P, routes: ServerRoutes) -> Self { Self::from_routes_with_stats(tun, routes, None) } /// Like [`Self::from_routes`] but also takes the shared admin counters. pub fn from_routes_with_stats( tun: P, routes: ServerRoutes, counters: Option, ) -> Self { let (inbound_tx, inbound_rx) = mpsc::channel::>(INBOUND_CAPACITY); Self { tun, routes, inbound_tx, inbound_rx, counters, } } /// Clone the shared routes (so the accept-loop can call [`Self::spawn_connection`] from /// outside the router task). pub fn routes(&self) -> ServerRoutes { self.routes.clone() } /// Clone the inbound sender so the accept-loop / handlers can spawn per-conn forwarder tasks /// without holding a reference to the router. pub fn inbound_sender(&self) -> mpsc::Sender> { self.inbound_tx.clone() } /// Spawn the per-connection inbound forwarder. /// /// Loops `conn.recv_packet()` and forwards each packet into the shared `inbound_tx`. When the /// connection errors out (peer closed, transport error, etc.) the function unregisters `ip` /// from the routes and releases it back to the pool. pub fn spawn_inbound_forwarder( routes: ServerRoutes, inbound_tx: mpsc::Sender>, conn: Arc, ip: IpAddr, peer_id: Option, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { loop { match conn.recv_packet().await { Ok(pkt) => { if inbound_tx.send(pkt).await.is_err() { tracing::debug!( %ip, peer = ?peer_id, "server router inbound channel closed; ending per-conn task" ); break; } } Err(e) => { tracing::info!( %ip, peer = ?peer_id, error = %e, "client connection ended; releasing ip and unregistering" ); break; } } } routes.unregister(ip).await; routes.pool().release(ip).await; }) } /// Run the TUN owner loop: read packets from the TUN, dispatch to the right connection; /// write packets coming in from the inbound channel to the TUN. Returns when either half /// fails. pub async fn run(mut self) -> anyhow::Result<()> { loop { tokio::select! { read = self.tun.read_packet() => { match read { Ok(pkt) => { if let Err(e) = self.route_outbound(&pkt).await { // Connection-level errors are non-fatal for the router: log and // keep serving other clients (the per-conn task will see its own // error path and release the IP). tracing::warn!(error = %e, "server router outbound send failed"); } } Err(e) => { return Err(anyhow::Error::new(e).context("server TUN read failed")); } } } maybe_pkt = self.inbound_rx.recv() => { match maybe_pkt { Some(pkt) => { if let Err(e) = self.tun.write_packet(&pkt).await { return Err(anyhow::Error::new(e).context("server TUN write failed")); } // Only count packets actually delivered to the server-side TUN. if let Some(c) = &self.counters { c.inc_rx(); } } None => { // All inbound senders dropped (the accept-loop and all per-conn // tasks). The router has no work left; exit. return Ok(()); } } } } } } /// Classify one outbound packet and dispatch it to the matching client connection. async fn route_outbound(&self, pkt: &[u8]) -> anyhow::Result<()> { let Some(dst) = dst_ip(pkt) else { tracing::trace!(len = pkt.len(), "dropping unparseable outbound packet"); return Ok(()); }; match self.routes.dispatch(dst, pkt).await? { true => { // Count packets that actually made it to a registered client connection. if let Some(c) = &self.counters { c.inc_tx(); } Ok(()) } false => { tracing::trace!(%dst, len = pkt.len(), "no client registered for destination; dropping"); Ok(()) } } } } #[cfg(test)] mod tests { use super::*; use std::collections::HashMap; use std::net::Ipv4Addr; use async_trait::async_trait; use tokio::sync::{mpsc, Mutex as TokMutex}; use crate::pool::PoolStrategy; /// In-memory fake TUN: `read_packet` drains an injected queue, `write_packet` forwards to a /// channel the test reads. Identical contract to the one in `aura-tunnel/tests/routes.rs`. struct MockTun { inbound: mpsc::Receiver>, written: mpsc::Sender>, } #[async_trait] impl PacketIo for MockTun { async fn read_packet(&mut self) -> std::io::Result> { match self.inbound.recv().await { Some(pkt) => Ok(pkt), None => Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "mock TUN closed", )), } } async fn write_packet(&mut self, packet: &[u8]) -> std::io::Result<()> { self.written .send(packet.to_vec()) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "test dropped")) } } /// Mock packet connection: `send_packet` forwards to a channel; `recv_packet` drains another. struct MockConn { sent: mpsc::Sender>, to_recv: TokMutex>>, } #[async_trait] impl PacketConnection for MockConn { async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { self.sent.send(packet.to_vec()).await?; Ok(()) } async fn recv_packet(&self) -> anyhow::Result> { let mut rx = self.to_recv.lock().await; rx.recv() .await .ok_or_else(|| anyhow::anyhow!("mock conn closed")) } } fn ipv4_packet_to(dst: Ipv4Addr) -> Vec { let mut pkt = vec![0u8; 20]; pkt[0] = 0x45; let o = dst.octets(); pkt[16..20].copy_from_slice(&o); pkt } fn ip(s: &str) -> IpAddr { s.parse().unwrap() } fn net(s: &str) -> ipnetwork::IpNetwork { s.parse().unwrap() } /// End-to-end test of the server router's data path: /// * Two MockConns are registered under .2 and .3. /// * A TUN packet to .2 should reach conn-A; to .3 -> conn-B; to .99 -> dropped. /// * A packet sent into conn-A's recv channel should be written verbatim to the TUN. #[tokio::test] async fn server_router_dispatches_by_destination_ip() { // Build a pool with .1 reserved as the server's IP. let pool = Arc::new( IpPool::new( net("10.0.0.0/29"), PoolStrategy::DynamicOnly, HashMap::new(), ip("10.0.0.1"), ) .unwrap(), ); // Wire up the mock TUN and the router. let (tun_in_tx, tun_in_rx) = mpsc::channel::>(8); let (tun_out_tx, mut tun_out_rx) = mpsc::channel::>(8); let tun = MockTun { inbound: tun_in_rx, written: tun_out_tx, }; let router = ServerRouter::new(tun, pool.clone()); let routes = router.routes(); let inbound_tx = router.inbound_sender(); // Allocate two clients via the pool and register them under those IPs. let ip_a = pool.assign("client-a").await.expect("ip-a"); let ip_b = pool.assign("client-b").await.expect("ip-b"); let (sent_a_tx, mut sent_a_rx) = mpsc::channel::>(8); let (recv_a_tx, recv_a_rx) = mpsc::channel::>(8); let conn_a: Arc = Arc::new(MockConn { sent: sent_a_tx, to_recv: TokMutex::new(recv_a_rx), }); let (sent_b_tx, mut sent_b_rx) = mpsc::channel::>(8); let (recv_b_tx, recv_b_rx) = mpsc::channel::>(8); let conn_b: Arc = Arc::new(MockConn { sent: sent_b_tx, to_recv: TokMutex::new(recv_b_rx), }); routes.register(ip_a, Arc::clone(&conn_a)).await; routes.register(ip_b, Arc::clone(&conn_b)).await; // Spawn per-conn forwarders and the router itself. let _fwd_a = ServerRouter::::spawn_inbound_forwarder( routes.clone(), inbound_tx.clone(), Arc::clone(&conn_a), ip_a, Some("client-a".into()), ); let _fwd_b = ServerRouter::::spawn_inbound_forwarder( routes.clone(), inbound_tx, Arc::clone(&conn_b), ip_b, Some("client-b".into()), ); let handle = tokio::spawn(router.run()); // 1. TUN read -> dispatch to conn-A by destination = ip_a. let pkt_to_a = ipv4_packet_to(match ip_a { IpAddr::V4(v4) => v4, _ => panic!("ipv4 in this test"), }); tun_in_tx.send(pkt_to_a.clone()).await.unwrap(); let got = tokio::time::timeout(std::time::Duration::from_secs(2), sent_a_rx.recv()) .await .expect("router did not dispatch to client-a in time") .expect("client-a sent channel closed"); assert_eq!(got, pkt_to_a, "conn-a should receive the packet verbatim"); // 2. TUN read -> dispatch to conn-B by destination = ip_b. let pkt_to_b = ipv4_packet_to(match ip_b { IpAddr::V4(v4) => v4, _ => panic!("ipv4 in this test"), }); tun_in_tx.send(pkt_to_b.clone()).await.unwrap(); let got = tokio::time::timeout(std::time::Duration::from_secs(2), sent_b_rx.recv()) .await .expect("router did not dispatch to client-b in time") .expect("client-b sent channel closed"); assert_eq!(got, pkt_to_b); // 3. TUN read -> destination .99 has nobody registered; must be silently dropped (no // crash, nothing reaches either conn). let pkt_to_unknown = ipv4_packet_to(Ipv4Addr::new(10, 0, 0, 99)); tun_in_tx.send(pkt_to_unknown).await.unwrap(); let res = tokio::time::timeout(std::time::Duration::from_millis(200), sent_a_rx.recv()).await; assert!(res.is_err(), "no packet should reach conn-a"); let res = tokio::time::timeout(std::time::Duration::from_millis(200), sent_b_rx.recv()).await; assert!(res.is_err(), "no packet should reach conn-b"); // 4. Inbound: conn-A's recv channel -> TUN write. let in_pkt = ipv4_packet_to(Ipv4Addr::new(8, 8, 8, 8)); recv_a_tx.send(in_pkt.clone()).await.unwrap(); let written = tokio::time::timeout(std::time::Duration::from_secs(2), tun_out_rx.recv()) .await .expect("router did not forward inbound to TUN in time") .expect("TUN write channel closed"); assert_eq!(written, in_pkt); // 5. Inbound from conn-B: TUN must also receive it (one TUN, many sources). let in_pkt_b = ipv4_packet_to(Ipv4Addr::new(1, 1, 1, 1)); recv_b_tx.send(in_pkt_b.clone()).await.unwrap(); let written = tokio::time::timeout(std::time::Duration::from_secs(2), tun_out_rx.recv()) .await .expect("inbound-b did not reach TUN") .expect("TUN write channel closed"); assert_eq!(written, in_pkt_b); // Cleanup: closing the TUN read side bubbles an EOF up the run-loop. drop(tun_in_tx); let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await; } /// When a per-conn task ends (e.g. the connection errors), the router must release the IP /// back to the pool and remove the dispatch entry so a re-registration can succeed. #[tokio::test] async fn per_conn_task_releases_ip_and_unregisters_on_error() { let pool = Arc::new( IpPool::new( net("10.0.0.0/30"), // only .2 is dynamically usable PoolStrategy::DynamicOnly, HashMap::new(), ip("10.0.0.1"), ) .unwrap(), ); let (_tun_in_tx, tun_in_rx) = mpsc::channel::>(8); let (tun_out_tx, _tun_out_rx) = mpsc::channel::>(8); let tun = MockTun { inbound: tun_in_rx, written: tun_out_tx, }; let router = ServerRouter::new(tun, pool.clone()); let routes = router.routes(); let inbound_tx = router.inbound_sender(); let _handle = tokio::spawn(router.run()); let ip_a = pool.assign("client-a").await.expect("only usable ip"); let (sent_tx, _sent_rx) = mpsc::channel::>(8); let (recv_tx, recv_rx) = mpsc::channel::>(8); let conn: Arc = Arc::new(MockConn { sent: sent_tx, to_recv: TokMutex::new(recv_rx), }); routes.register(ip_a, Arc::clone(&conn)).await; let fwd = ServerRouter::::spawn_inbound_forwarder( routes.clone(), inbound_tx, Arc::clone(&conn), ip_a, Some("client-a".into()), ); // Drop the recv sender to make `conn.recv_packet()` return an error, which ends the task. drop(recv_tx); let _ = tokio::time::timeout(std::time::Duration::from_secs(2), fwd).await; // The IP must be releasable now (re-assigned without exhaustion). let next = pool.assign("client-b").await.expect("ip back in pool"); assert_eq!(next, ip_a, "released ip should be reusable"); // And the dispatch map should not still contain a stale entry under ip_a. let snap = routes.snapshot_ips().await; assert!( !snap.contains(&ip_a), "stale entry should have been unregistered" ); } }