//! Integration tests for the UDP **cover-traffic** (idle-chaff) feature. //! //! These exercise the end-to-end behaviour of [`UdpOpts::cover_traffic_enabled`] / //! [`UdpOpts::cover_mean_interval_ms`] / [`UdpOpts::cover_jitter`]: //! //! * [`udp_cover_traffic_fires_on_idle`] — cover enabled on the client; with no user data, the //! server's `recv_packet` returns at least one Pong-flavoured artefact (a Pong sealed by the //! server in response to the client's cover Ping is what the client would have seen if it called //! `recv_packet`). We instead drive a known-payload round trip *after* an idle window to assert //! that things keep working even when the cover task is running. //! * [`udp_cover_traffic_skipped_under_load`] — drive 50 packets in ≈1 second; the cover task must //! skip every attempt (link is non-idle), so the total number of datagrams the server observes is //! ≈ 50, not noticeably more. //! * [`udp_cover_traffic_disabled_back_compat`] — defaults give exactly the pre-feature wire //! silence: no extra Pings, no extra Pongs after a 1 s idle window. use std::net::SocketAddr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; use aura_pki::AuraCa; use aura_proto::{ClientConfig, PacketConnection, ServerConfig}; use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer}; use tokio::net::UdpSocket; const SERVER_NAME: &str = "localhost"; const CLIENT_ID: &str = "client-cover"; fn make_configs() -> (ServerConfig, ClientConfig) { let ca = AuraCa::generate("Aura UDP Cover Test CA").expect("generate CA"); let server_cert = ca .issue_server_cert(SERVER_NAME) .expect("issue server cert"); let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert"); let ca_pem = ca.ca_cert_pem(); let server_cfg = ServerConfig { ca_cert_pem: ca_pem.clone(), server_cert_pem: server_cert.cert_pem, server_key_pem: server_cert.key_pem, }; let client_cfg = ClientConfig { ca_cert_pem: ca_pem, client_cert_pem: client_cert.cert_pem, client_key_pem: client_cert.key_pem, server_name: SERVER_NAME.to_string(), }; (server_cfg, client_cfg) } /// A datagram-counting forwarder that relays UDP between a "front" (clients connect here) and a /// "back" (real server), counting each direction. We use this to observe wire-level traffic /// directly — cover-Ping → cover-Pong both pass through, so the counters reflect total chaff. async fn spawn_counting_forwarder( server_addr: SocketAddr, ) -> (SocketAddr, Arc, Arc) { let front = UdpSocket::bind("127.0.0.1:0").await.expect("bind front"); let back = UdpSocket::bind("127.0.0.1:0").await.expect("bind back"); let front_addr = front.local_addr().expect("front addr"); let front = Arc::new(front); let back = Arc::new(back); let c2s = Arc::new(AtomicU64::new(0)); let s2c = Arc::new(AtomicU64::new(0)); let client_addr: Arc>> = Arc::new(tokio::sync::Mutex::new(None)); // Client -> Server. { let front = front.clone(); let back = back.clone(); let c2s = c2s.clone(); let client_addr = client_addr.clone(); tokio::spawn(async move { let mut buf = vec![0u8; 4096]; loop { let (n, from) = match front.recv_from(&mut buf).await { Ok(v) => v, Err(_) => continue, }; { let mut ca = client_addr.lock().await; if ca.is_none() { *ca = Some(from); } } c2s.fetch_add(1, Ordering::Relaxed); let _ = back.send_to(&buf[..n], server_addr).await; } }); } // Server -> Client. { let front = front.clone(); let back = back.clone(); let s2c = s2c.clone(); let client_addr = client_addr.clone(); tokio::spawn(async move { let mut buf = vec![0u8; 4096]; loop { let (n, _) = match back.recv_from(&mut buf).await { Ok(v) => v, Err(_) => continue, }; let dest = { *client_addr.lock().await }; if let Some(dest) = dest { s2c.fetch_add(1, Ordering::Relaxed); let _ = front.send_to(&buf[..n], dest).await; } } }); } (front_addr, c2s, s2c) } #[tokio::test] async fn udp_cover_traffic_fires_on_idle() { let (server_cfg, client_cfg) = make_configs(); // Tight cover interval so the test does not have to wait long for chaff to show up. let opts = UdpOpts { cover_traffic_enabled: true, cover_mean_interval_ms: 100, cover_jitter: 0.2, ..UdpOpts::default() }; let server = UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server"); let server_addr = server.local_addr().expect("server addr"); // Put the counting forwarder between client and server so we can see chaff on the wire. let (proxy_addr, c2s, s2c) = spawn_counting_forwarder(server_addr).await; let accept_task = tokio::spawn(async move { server.accept().await }); let connect_task = tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) .await .expect("accept timely") .expect("accept join") .expect("server accept"); let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) .await .expect("connect timely") .expect("connect join") .expect("client connect"); let server_conn: Arc = Arc::new(server_conn); let client_conn: Arc = Arc::new(client_conn); // Snapshot counters after the handshake completes, then go idle for ~1.5 s. let c2s_after_hs = c2s.load(Ordering::Relaxed); let s2c_after_hs = s2c.load(Ordering::Relaxed); // Spawn a recv loop on the server side so it actually drives its read path (and replies to // Pings). Without this, the client's cover Pings would sit in the OS socket buffer. let server_for_recv = server_conn.clone(); let recv_task = tokio::spawn(async move { for _ in 0..20 { // Each call returns on Data; cover Pings are answered internally without ever // returning. So if we hit a timeout, that's expected: cover traffic does NOT surface // as data. We're really just running the loop so the server processes Pings. let _ = tokio::time::timeout(Duration::from_millis(300), server_for_recv.recv_packet()) .await; } }); // Mirror on the client side: keep its recv path active so it consumes Pongs (otherwise the // OS recv buffer fills with Pongs and the test sees them only on the wire counter, but the // cover-task's own state stays clean). let client_for_recv = client_conn.clone(); let recv_task_c = tokio::spawn(async move { for _ in 0..20 { let _ = tokio::time::timeout(Duration::from_millis(300), client_for_recv.recv_packet()) .await; } }); tokio::time::sleep(Duration::from_millis(1500)).await; let c2s_idle = c2s.load(Ordering::Relaxed); let s2c_idle = s2c.load(Ordering::Relaxed); let c2s_chaff = c2s_idle.saturating_sub(c2s_after_hs); let s2c_chaff = s2c_idle.saturating_sub(s2c_after_hs); // With cover_mean_interval_ms = 100 over ~1.5 s, we should see > 0 chaff datagrams in each // direction (client sends Pings; server replies with Pongs). We do not pin a tight bound to // avoid flake on slow CI; even one chaff datagram per direction proves the feature is firing. assert!( c2s_chaff >= 1, "expected at least one cover-traffic Ping client→server, got {c2s_chaff} (handshake baseline {c2s_after_hs})", ); assert!( s2c_chaff >= 1, "expected at least one cover-traffic Pong server→client, got {s2c_chaff} (handshake baseline {s2c_after_hs})", ); drop(recv_task); drop(recv_task_c); } #[tokio::test] async fn udp_cover_traffic_skipped_under_load() { let (server_cfg, client_cfg) = make_configs(); // Cover-task mean interval ~50 ms; user traffic must suppress chaff one-for-one because each // send updates `last_send_ms`. let opts = UdpOpts { cover_traffic_enabled: true, cover_mean_interval_ms: 50, cover_jitter: 0.1, ..UdpOpts::default() }; let server = UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server"); let server_addr = server.local_addr().expect("server addr"); let (proxy_addr, c2s, _s2c) = spawn_counting_forwarder(server_addr).await; let accept_task = tokio::spawn(async move { server.accept().await }); let connect_task = tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) .await .expect("accept timely") .expect("accept join") .expect("server accept"); let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) .await .expect("connect timely") .expect("connect join") .expect("client connect"); let server_conn: Arc = Arc::new(server_conn); let client_conn: Arc = Arc::new(client_conn); // Drain the server in the background so it actually processes incoming packets (and Pings). let server_for_recv = server_conn.clone(); let recv_task = tokio::spawn(async move { let mut got = 0u32; while let Ok(Ok(_)) = tokio::time::timeout(Duration::from_millis(2000), server_for_recv.recv_packet()).await { got += 1; if got >= 50 { break; } } got }); let c2s_after_hs = c2s.load(Ordering::Relaxed); // Drive 50 user packets over ≈1 s. Each send updates last_send_ms, so the cover task's idle // check (`now - last < delay_ms`) is true on every iteration and chaff is skipped. let total = 50u32; let pkt = vec![0xABu8; 100]; let start = std::time::Instant::now(); for _ in 0..total { client_conn.send_packet(&pkt).await.expect("client send"); // Pace the sends to ~1 packet every 20 ms (50 packets in 1 s); spacing < mean interval // (50 ms) so the suppression check always wins. tokio::time::sleep(Duration::from_millis(20)).await; } let elapsed = start.elapsed(); assert!( elapsed < Duration::from_secs(3), "loop should finish under 3 s, took {elapsed:?}", ); // Give the server a moment to drain. let _ = tokio::time::timeout(Duration::from_secs(3), recv_task).await; let c2s_data = c2s.load(Ordering::Relaxed).saturating_sub(c2s_after_hs); // Expect ≈ 50 datagrams from client to server (the user packets). Allow a small slack for one // straggler cover Ping if a sleep wakes up just slightly late; tightly bound at ≤ 60. assert!( c2s_data >= total as u64, "client must have sent at least {total} user packets, observed {c2s_data}", ); assert!( c2s_data <= (total as u64) + 10, "cover-traffic should be suppressed under load, but observed {c2s_data} datagrams (expected ≈ {total})", ); } #[tokio::test] async fn udp_cover_traffic_disabled_back_compat() { let (server_cfg, client_cfg) = make_configs(); let opts = UdpOpts::default(); // cover_traffic_enabled: false let server = UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server"); let server_addr = server.local_addr().expect("server addr"); let (proxy_addr, c2s, s2c) = spawn_counting_forwarder(server_addr).await; let accept_task = tokio::spawn(async move { server.accept().await }); let connect_task = tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) .await .expect("accept timely") .expect("accept join") .expect("server accept"); let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) .await .expect("connect timely") .expect("connect join") .expect("client connect"); // After the handshake, both sides go fully idle for 1 s. Nothing must hit the wire. let c2s_after_hs = c2s.load(Ordering::Relaxed); let s2c_after_hs = s2c.load(Ordering::Relaxed); // Hold references so the connections do not drop early. let _hold_server = server_conn; let _hold_client = client_conn; tokio::time::sleep(Duration::from_secs(1)).await; assert_eq!( c2s.load(Ordering::Relaxed), c2s_after_hs, "no client→server chaff with cover disabled", ); assert_eq!( s2c.load(Ordering::Relaxed), s2c_after_hs, "no server→client chaff with cover disabled", ); }