//! Integration tests for the server-side per-client routing path //! ([`aura_cli::server_router::ServerRouter`] + [`aura_cli::pool::IpPool`]). //! //! These tests do not bind any sockets and do not need root: they wire mock //! [`PacketIo`](aura_tunnel::PacketIo) and [`PacketConnection`](aura_proto::PacketConnection) //! implementations into the router and drive both directions through channels. use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr}; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use aura_cli::pool::{IpPool, PoolStrategy}; use aura_cli::server_router::ServerRouter; use aura_proto::PacketConnection; use aura_tunnel::PacketIo; use ipnetwork::IpNetwork; use tokio::sync::{mpsc, Mutex}; // ---- in-memory mocks --------------------------------------------------------------------------- /// Fake TUN backed by mpsc channels; identical to the one in aura-tunnel's routes test. struct MockTun { inbound: mpsc::Receiver>, written: mpsc::Sender>, } #[async_trait] impl PacketIo for MockTun { async fn read_packet(&mut self) -> std::io::Result> { match self.inbound.recv().await { Some(pkt) => Ok(pkt), None => Err(std::io::Error::new( std::io::ErrorKind::UnexpectedEof, "mock TUN closed", )), } } async fn write_packet(&mut self, packet: &[u8]) -> std::io::Result<()> { self.written .send(packet.to_vec()) .await .map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "test dropped")) } } /// Fake encrypted connection backed by mpsc. struct MockConn { sent: mpsc::Sender>, to_recv: Mutex>>, } #[async_trait] impl PacketConnection for MockConn { async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { self.sent.send(packet.to_vec()).await?; Ok(()) } async fn recv_packet(&self) -> anyhow::Result> { let mut rx = self.to_recv.lock().await; rx.recv() .await .ok_or_else(|| anyhow::anyhow!("mock conn closed")) } } fn ipv4_packet_to(dst: Ipv4Addr) -> Vec { let mut pkt = vec![0u8; 20]; pkt[0] = 0x45; let o = dst.octets(); pkt[16..20].copy_from_slice(&o); pkt } fn ip(s: &str) -> IpAddr { s.parse().unwrap() } fn netw(s: &str) -> IpNetwork { s.parse().unwrap() } // ---- the main scenario ------------------------------------------------------------------------- /// Two clients, one TUN: each TUN packet must reach the right connection, and inbound packets /// from either connection must reach the TUN. An unknown destination IP is silently dropped. #[tokio::test] async fn server_router_per_client_dispatch_end_to_end() { // Pool: 10.0.0.0/29; server uses .1; .2/.3 will be handed to client-a / client-b dynamically. let pool = Arc::new( IpPool::new( netw("10.0.0.0/29"), PoolStrategy::DynamicOnly, HashMap::new(), ip("10.0.0.1"), ) .expect("pool"), ); // Wire the mocks. let (tun_in_tx, tun_in_rx) = mpsc::channel::>(8); let (tun_out_tx, mut tun_out_rx) = mpsc::channel::>(8); let tun = MockTun { inbound: tun_in_rx, written: tun_out_tx, }; // Build the router, capture handles to its routes + inbound channel, then spawn the run-loop. let router = ServerRouter::new(tun, Arc::clone(&pool)); let routes = router.routes(); let inbound_tx = router.inbound_sender(); let run = tokio::spawn(router.run()); // Allocate two clients and register their connections. let ip_a = pool.assign("client-a").await.expect("ip-a"); let ip_b = pool.assign("client-b").await.expect("ip-b"); assert_ne!(ip_a, ip_b); let (sent_a_tx, mut sent_a_rx) = mpsc::channel::>(8); let (recv_a_tx, recv_a_rx) = mpsc::channel::>(8); let conn_a: Arc = Arc::new(MockConn { sent: sent_a_tx, to_recv: Mutex::new(recv_a_rx), }); let (sent_b_tx, mut sent_b_rx) = mpsc::channel::>(8); let (recv_b_tx, recv_b_rx) = mpsc::channel::>(8); let conn_b: Arc = Arc::new(MockConn { sent: sent_b_tx, to_recv: Mutex::new(recv_b_rx), }); routes.register(ip_a, Arc::clone(&conn_a)).await; routes.register(ip_b, Arc::clone(&conn_b)).await; // Per-conn inbound forwarders push received packets onto the shared TUN. let _fa = ServerRouter::::spawn_inbound_forwarder( routes.clone(), inbound_tx.clone(), Arc::clone(&conn_a), ip_a, Some("client-a".into()), ); let _fb = ServerRouter::::spawn_inbound_forwarder( routes.clone(), inbound_tx, Arc::clone(&conn_b), ip_b, Some("client-b".into()), ); // (a) TUN -> client-a: a packet to ip_a must arrive on conn-a's send channel verbatim. let pkt_to_a = match ip_a { IpAddr::V4(v4) => ipv4_packet_to(v4), _ => panic!("ipv4 expected"), }; tun_in_tx.send(pkt_to_a.clone()).await.unwrap(); let got = tokio::time::timeout(Duration::from_secs(2), sent_a_rx.recv()) .await .expect("conn-a did not receive the packet in time") .expect("conn-a send channel closed"); assert_eq!(got, pkt_to_a); // (b) TUN -> client-b. let pkt_to_b = match ip_b { IpAddr::V4(v4) => ipv4_packet_to(v4), _ => panic!("ipv4 expected"), }; tun_in_tx.send(pkt_to_b.clone()).await.unwrap(); let got = tokio::time::timeout(Duration::from_secs(2), sent_b_rx.recv()) .await .expect("conn-b did not receive the packet in time") .expect("conn-b send channel closed"); assert_eq!(got, pkt_to_b); // (c) Unknown destination: nobody is registered for .99, so it is silently dropped. let pkt_to_unknown = ipv4_packet_to(Ipv4Addr::new(10, 0, 0, 99)); tun_in_tx.send(pkt_to_unknown).await.unwrap(); // Give the router a moment, then assert nothing reached either connection. let res = tokio::time::timeout(Duration::from_millis(200), sent_a_rx.recv()).await; assert!(res.is_err(), "no packet should reach conn-a"); let res = tokio::time::timeout(Duration::from_millis(200), sent_b_rx.recv()).await; assert!(res.is_err(), "no packet should reach conn-b"); // (d) Inbound conn-a -> TUN. let in_pkt_a = ipv4_packet_to(Ipv4Addr::new(8, 8, 8, 8)); recv_a_tx.send(in_pkt_a.clone()).await.unwrap(); let written = tokio::time::timeout(Duration::from_secs(2), tun_out_rx.recv()) .await .expect("router did not write inbound to TUN in time") .expect("TUN write channel closed"); assert_eq!(written, in_pkt_a); // (e) Inbound conn-b -> same TUN. let in_pkt_b = ipv4_packet_to(Ipv4Addr::new(1, 1, 1, 1)); recv_b_tx.send(in_pkt_b.clone()).await.unwrap(); let written = tokio::time::timeout(Duration::from_secs(2), tun_out_rx.recv()) .await .expect("inbound conn-b did not reach TUN in time") .expect("TUN write channel closed"); assert_eq!(written, in_pkt_b); // Tear down: closing the TUN read side exits the run-loop. drop(tun_in_tx); let _ = tokio::time::timeout(Duration::from_secs(2), run).await; } /// Per-conn task ending must release the IP and unregister the client. #[tokio::test] async fn per_conn_disconnect_releases_ip() { let pool = Arc::new( IpPool::new( netw("10.0.0.0/30"), // only .2 is usable PoolStrategy::DynamicOnly, HashMap::new(), ip("10.0.0.1"), ) .expect("pool"), ); let (_tun_in_tx, tun_in_rx) = mpsc::channel::>(8); let (tun_out_tx, _tun_out_rx) = mpsc::channel::>(8); let tun = MockTun { inbound: tun_in_rx, written: tun_out_tx, }; let router = ServerRouter::new(tun, Arc::clone(&pool)); let routes = router.routes(); let inbound_tx = router.inbound_sender(); let _run = tokio::spawn(router.run()); let ip_a = pool.assign("alice").await.expect("the only usable ip"); let (sent_tx, _sent_rx) = mpsc::channel::>(8); let (recv_tx, recv_rx) = mpsc::channel::>(8); let conn: Arc = Arc::new(MockConn { sent: sent_tx, to_recv: Mutex::new(recv_rx), }); routes.register(ip_a, Arc::clone(&conn)).await; let fwd = ServerRouter::::spawn_inbound_forwarder( routes.clone(), inbound_tx, Arc::clone(&conn), ip_a, Some("alice".into()), ); // Cause `recv_packet` to fail, ending the per-conn task. drop(recv_tx); let _ = tokio::time::timeout(Duration::from_secs(2), fwd).await; // The pool should be back to a fresh state. assert!( !routes.snapshot_ips().await.contains(&ip_a), "stale entry must be unregistered" ); assert_eq!( pool.assign("bob").await, Some(ip_a), "released IP must be reusable" ); }