diff --git a/Cargo.lock b/Cargo.lock index 99a7f0e..d09c651 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,11 +276,13 @@ dependencies = [ "aura-pki", "aura-proto", "bytes", + "hmac", "quinn", "rand 0.8.6", "rustls", "rustls-pemfile", "rustls-pki-types", + "sha2", "thiserror 1.0.69", "tokio", "tokio-rustls", diff --git a/crates/aura-transport/Cargo.toml b/crates/aura-transport/Cargo.toml index d1c93e3..966635d 100644 --- a/crates/aura-transport/Cargo.toml +++ b/crates/aura-transport/Cargo.toml @@ -25,6 +25,12 @@ rustls-pemfile = "2" # boundary is still the inner Aura handshake, just like for the QUIC backend). Local-only to this # crate — not a new workspace dependency. tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } +# HMAC-SHA256 for UDP port-knocking (probe resistance): the knock token is +# `HMAC(knock_key, u64_be(unix_minute))[..16]`, prefixed on every HS datagram when +# `UdpOpts::knock_required` is enabled. Both already resolved in the workspace lockfile (transitively +# via aura-crypto's deps tree), so no new version is introduced. +hmac = "0.12" +sha2 = "0.10" [dev-dependencies] # The loopback integration test mints a CA + server/client certs to drive a real QUIC handshake. diff --git a/crates/aura-transport/src/lib.rs b/crates/aura-transport/src/lib.rs index 06e387e..44c7217 100644 --- a/crates/aura-transport/src/lib.rs +++ b/crates/aura-transport/src/lib.rs @@ -80,7 +80,7 @@ pub use padding::{ }; pub use quic::{client_endpoint, server_endpoint, AcceptAnyServerCert}; pub use tcp::{TcpClient, TcpConnection, TcpOpts, TcpServer, DEFAULT_TCP_ALPN}; -pub use udp::{UdpClient, UdpConnection, UdpOpts, UdpServer}; +pub use udp::{knock_for_minute, UdpClient, UdpConnection, UdpOpts, UdpServer, KNOCK_LEN}; // Re-export the inner proto trait so downstream crates (the CLI) can name the connection as // `Arc` without a separate `aura_proto` import. diff --git a/crates/aura-transport/src/udp.rs b/crates/aura-transport/src/udp.rs index 38e7150..c60f952 100644 --- a/crates/aura-transport/src/udp.rs +++ b/crates/aura-transport/src/udp.rs @@ -50,6 +50,7 @@ use std::collections::{BTreeMap, HashMap}; use std::io; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -96,15 +97,145 @@ const ACK_NONE: u16 = u16::MAX; /// ~1253 bytes; data records are MTU-sized; this leaves slack for headers + obfuscation padding). const RECV_BUF: usize = 2048; +/// Length of the port-knock token prefixed on each HS datagram when +/// [`UdpOpts::knock_required`] is enabled (truncated HMAC-SHA256 output). +pub const KNOCK_LEN: usize = 16; + +// --------------------------------------------------------------------------------------------- +// Time helpers + knock derivation +// --------------------------------------------------------------------------------------------- + +/// Current wall-clock minute since the Unix epoch (`floor(now_secs / 60)`). +/// +/// Returns 0 if the system clock is reported as before the epoch (extremely unusual; the knock +/// validator's ±1-minute window absorbs the resulting bucket on healthy peers). +fn current_unix_minute() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_secs() / 60) + .unwrap_or(0) +} + +/// Current wall-clock milliseconds since the Unix epoch, for the cover-traffic last-send timestamp. +fn unix_ms() -> u64 { + use std::time::{SystemTime, UNIX_EPOCH}; + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_millis() as u64) + .unwrap_or(0) +} + +/// Derive the 16-byte port-knock token for `minute` under the shared `key`. +/// +/// Wire formula: `HMAC-SHA256(key, u64_be(minute))[..16]`. The server validates against +/// [`current_unix_minute`] and ±1 to tolerate honest clock skew (≈3-minute acceptance window). +/// +/// Exposed primarily as a test seam (drive the validator with a fake minute) and so the CLI / a +/// future wire-probe tool can compute the same token; production code does not need to call it +/// directly because the adapter prefixes it on every HS datagram when +/// [`UdpOpts::knock_required`] is set. +pub fn knock_for_minute(key: &[u8; 32], minute: u64) -> [u8; KNOCK_LEN] { + use hmac::{Hmac, Mac}; + use sha2::Sha256; + let mut mac = as Mac>::new_from_slice(key) + .expect("HMAC accepts any key length, so a 32-byte slice cannot fail"); + mac.update(&minute.to_be_bytes()); + let tag = mac.finalize().into_bytes(); + let mut out = [0u8; KNOCK_LEN]; + out.copy_from_slice(&tag[..KNOCK_LEN]); + out +} + +/// Constant-time compare of two 16-byte knock tokens. Avoids leaking the index of the first +/// differing byte through timing — a defensive choice; the knock is a coarse probe-resistance +/// filter, not a per-byte secret, but a tight loop is just as cheap as a non-CT compare here. +fn ct_eq_knock(a: &[u8; KNOCK_LEN], b: &[u8; KNOCK_LEN]) -> bool { + let mut acc = 0u8; + for i in 0..KNOCK_LEN { + acc |= a[i] ^ b[i]; + } + acc == 0 +} + +/// Strip the knock prefix from a datagram from a **known** peer when knocking is on. Returns +/// `Some(stripped)` for valid wire layouts, `None` for malformed ones (which the master loop will +/// silently drop): +/// +/// * Empty datagram → `None`. +/// * `0x02 ...` (DATA) → passed through unchanged (DATA datagrams are never knock-prefixed). +/// * `knock(16) || 0x01 || ...` (HS, len ≥ 17) → returns the tail starting at the `0x01`. +/// * Anything else → `None`. +/// +/// We do **not** re-validate the knock on the already-known-peer path (per the spec: "На датаграмму +/// от известного пира — без проверки knock"). Once an address has registered via a valid first +/// knock, subsequent prefixes are trusted as a wire-format artefact, not a continuing auth check. +fn strip_knock_for_known_peer(dg: &[u8]) -> Option> { + if dg.is_empty() { + return None; + } + if dg[0] == TYPE_DATA { + return Some(dg.to_vec()); + } + if dg.len() > KNOCK_LEN && dg[KNOCK_LEN] == TYPE_HS { + return Some(dg[KNOCK_LEN..].to_vec()); + } + None +} + +/// Validate the leading 16-byte knock prefix against `HMAC(key, minute_be)[..16]` for the current +/// Unix-minute and ±1 (a ≈3-minute acceptance window), then return the stripped datagram (with the +/// type byte `TYPE_HS` at index 0). Returns `None` on any wire-format or HMAC failure — the caller +/// silently drops, so a passive probe sees no response. +fn validate_and_strip_knock(dg: &[u8], key: &[u8; 32]) -> Option> { + if dg.len() <= KNOCK_LEN || dg[KNOCK_LEN] != TYPE_HS { + return None; + } + let mut prefix = [0u8; KNOCK_LEN]; + prefix.copy_from_slice(&dg[..KNOCK_LEN]); + let now = current_unix_minute(); + // ±1 minute tolerance. Use saturating_sub to avoid wrapping at the epoch boundary. + let candidates = [now, now.saturating_sub(1), now.saturating_add(1)]; + for m in candidates { + let expected = knock_for_minute(key, m); + if ct_eq_knock(&prefix, &expected) { + return Some(dg[KNOCK_LEN..].to_vec()); + } + } + None +} + // --------------------------------------------------------------------------------------------- // Options // --------------------------------------------------------------------------------------------- -/// Tunables for the UDP transport (handshake reliability timers and obfuscation). +/// Tunables for the UDP transport (handshake reliability timers, obfuscation, and the two +/// anti-surveillance features). /// /// [`UdpOpts::default`] is a sensible production default: obfuscation off, a 250 ms retransmit -/// timeout, a 10 s overall handshake deadline, and padding profile `0` (the historical -/// [`HTTPS_SIZE_BUCKETS`](padding::HTTPS_SIZE_BUCKETS) palette). +/// timeout, a 10 s overall handshake deadline, padding profile `0` (the historical +/// [`HTTPS_SIZE_BUCKETS`](padding::HTTPS_SIZE_BUCKETS) palette), **knock disabled** and +/// **cover traffic disabled**. The two anti-surveillance toggles are opt-in so existing callers +/// keep the pre-feature wire behaviour without any changes. +/// +/// ## Probe resistance — UDP port-knocking +/// +/// When [`Self::knock_required`] is `true`, the client prefixes a 16-byte HMAC token on **every** +/// HS datagram it sends; the server silently drops any first datagram from an unknown source whose +/// prefix does not validate against the shared [`Self::knock_key`] for the current Unix-minute +/// (with ±1 minute tolerance for clock skew). To a passive scanner the listening UDP port looks +/// closed. The shared key is the SHA-256 of the Aura CA cert DER (the CLI computes it and supplies +/// it here; the transport just consumes the 32 bytes). +/// +/// ## Cover traffic — idle-time chaff +/// +/// When [`Self::cover_traffic_enabled`] is `true`, an established [`UdpConnection`] runs a +/// background task that injects encrypted [`Frame::Ping`]s during idle periods so the on-wire byte +/// rate stays roughly constant. The interval between attempts is +/// `cover_mean_interval_ms ± cover_jitter` (uniform), and an attempt is **skipped** if any DATA +/// datagram was sent within the previous interval (so user traffic suppresses chaff). The receiver +/// handles each cover Ping exactly like any other Ping (it answers with a Pong and keeps reading) +/// — no application-layer awareness needed. #[derive(Clone, Copy, Debug)] pub struct UdpOpts { /// When `true`, pad every outgoing DATA datagram up to the next bucket of the configured @@ -123,6 +254,30 @@ pub struct UdpOpts { /// How long the post-handshake linger task keeps resending the final flight (so the peer's last /// flight is not lost) before giving up if no DATA datagram arrives. pub hs_linger: Duration, + + // -- anti-surveillance: probe resistance ---------------------------------------------------- + /// When `true`, port-knocking is required on the server side and the client must prefix the + /// 16-byte knock token on every HS datagram (see the type-level "Probe resistance" docs). + /// `[Self::knock_key]` MUST be `Some(...)` when this is `true`; if it is not, both ends behave + /// as if knocking is off and no knock prefix is added or validated. Default `false` for + /// back-compat. + pub knock_required: bool, + /// Shared 32-byte key for the knock HMAC (typically `SHA-256(CA-cert-DER)`). Used only when + /// [`Self::knock_required`] is `true`. Default `None`. + pub knock_key: Option<[u8; 32]>, + + // -- anti-surveillance: cover traffic -------------------------------------------------------- + /// When `true`, after the handshake the [`UdpConnection`] spawns a background task that injects + /// encrypted [`Frame::Ping`]s during idle periods (see the type-level "Cover traffic" docs). + /// Default `false` for back-compat. + pub cover_traffic_enabled: bool, + /// Mean interval, in milliseconds, between cover-traffic attempts. Default `500`. Effective + /// only when [`Self::cover_traffic_enabled`] is `true`. + pub cover_mean_interval_ms: u64, + /// Uniform jitter fraction applied to [`Self::cover_mean_interval_ms`] (e.g. `0.5` gives + /// ±50%, so the effective interval is `mean * (1 ± 0.5)`). Clamped into `[0.0, 1.0)`. Default + /// `0.5`. + pub cover_jitter: f32, } impl Default for UdpOpts { @@ -133,6 +288,11 @@ impl Default for UdpOpts { hs_rto: Duration::from_millis(250), hs_timeout: Duration::from_secs(10), hs_linger: Duration::from_secs(2), + knock_required: false, + knock_key: None, + cover_traffic_enabled: false, + cover_mean_interval_ms: 500, + cover_jitter: 0.5, } } } @@ -261,6 +421,10 @@ struct ReliableHsAdapter { /// Signalled by `poll_write` when new bytes are buffered, so the driver flushes promptly without /// busy-polling. write_notify: Arc, + /// Optional port-knock key. When `Some`, **the client** prefixes every outgoing HS datagram with + /// the 16-byte `knock_for_minute(key, current_unix_minute())` token (probe resistance). Set + /// only on the client side (the server never knocks back); always `None` on the server. + knock_key: Option<[u8; 32]>, } /// All mutable state of the reliable handshake adapter. @@ -353,17 +517,34 @@ impl ReliableHsAdapter { socket: Arc, state: Arc>, write_notify: Arc, + knock_key: Option<[u8; 32]>, ) -> Self { Self { socket, state, write_notify, + knock_key, } } /// Build and send one HS datagram carrying `msg_bytes` at sequence `seq` with the current ack. - async fn send_hs(socket: &PeerSocket, seq: u16, ack_upto: u16, msg_bytes: &[u8]) { - let mut dg = Vec::with_capacity(HS_PREFIX_LEN + msg_bytes.len()); + /// + /// When `knock_key` is `Some`, the 16-byte port-knock token for the current Unix-minute is + /// prefixed to the datagram (probe-resistance; see [`UdpOpts::knock_required`]). When `None`, + /// the datagram is emitted unchanged — matches the historical wire layout. + async fn send_hs( + socket: &PeerSocket, + seq: u16, + ack_upto: u16, + msg_bytes: &[u8], + knock_key: Option<&[u8; 32]>, + ) { + let knock_pad = if knock_key.is_some() { KNOCK_LEN } else { 0 }; + let mut dg = Vec::with_capacity(knock_pad + HS_PREFIX_LEN + msg_bytes.len()); + if let Some(key) = knock_key { + let token = knock_for_minute(key, current_unix_minute()); + dg.extend_from_slice(&token); + } dg.push(TYPE_HS); dg.extend_from_slice(&seq.to_be_bytes()); dg.extend_from_slice(&ack_upto.to_be_bytes()); @@ -399,7 +580,14 @@ impl ReliableHsAdapter { st.unacked.insert(seq, msg.clone()); (seq, ack, msg) }; - Self::send_hs(&self.socket, to_send.0, to_send.1, &to_send.2).await; + Self::send_hs( + &self.socket, + to_send.0, + to_send.1, + &to_send.2, + self.knock_key.as_ref(), + ) + .await; } } @@ -435,7 +623,7 @@ impl ReliableHsAdapter { let st = self.state.lock().await; (st.next_send_seq, st.ack_upto()) }; - Self::send_hs(&self.socket, seq, ack, &[]).await; + Self::send_hs(&self.socket, seq, ack, &[], self.knock_key.as_ref()).await; } /// Retransmit all currently-unacked HS datagrams (called on the RTO timer), each carrying the @@ -448,7 +636,7 @@ impl ReliableHsAdapter { (st.ack_upto(), batch) }; for (seq, msg) in batch { - Self::send_hs(&self.socket, seq, ack, &msg).await; + Self::send_hs(&self.socket, seq, ack, &msg, self.knock_key.as_ref()).await; } } @@ -573,6 +761,7 @@ async fn run_reliable_handshake( socket: Arc, state: Arc>, opts: UdpOpts, + knock_key: Option<[u8; 32]>, run_hs: F, ) -> anyhow::Result where @@ -586,13 +775,20 @@ where socket.clone(), state.clone(), write_notify.clone(), + knock_key, )); let writer = AdapterWrite(ReliableHsAdapter::new( socket.clone(), state.clone(), write_notify.clone(), + knock_key, )); - let driver = ReliableHsAdapter::new(socket.clone(), state.clone(), write_notify.clone()); + let driver = ReliableHsAdapter::new( + socket.clone(), + state.clone(), + write_notify.clone(), + knock_key, + ); let hs_fut = run_hs(reader, writer); tokio::pin!(hs_fut); @@ -710,16 +906,38 @@ impl AsyncWrite for AdapterWrite { /// surfaces as an error. Late handshake retransmits (`0x01` HS datagrams) seen on the data path are /// dropped. Send and receive use **separate** [`tokio::sync::Mutex`]es, so the two directions run /// concurrently. +/// +/// When [`UdpOpts::cover_traffic_enabled`] is set, the constructor spawns a background task that +/// injects encrypted [`Frame::Ping`]s during idle periods (see the type-level "Cover traffic" docs +/// on [`UdpOpts`]); the task is `abort`ed on `Drop`. pub struct UdpConnection { socket: Arc, - sender: Mutex, + sender: Arc>, receiver: Mutex, peer_id: Option, opts: UdpOpts, + /// Wall-clock ms of the last datagram **we** emitted on the data path (DATA `0x02`). Updated by + /// [`PacketConnection::send_packet`] and by [`PacketConnection::recv_packet`] every time the + /// receive path emits a `Pong` reply, and read by the cover task to skip an attempt when the + /// link has not been idle. `Arc` so the cover task observes the same counter without + /// contending on the send mutex. + last_send_ms: Arc, /// `Some` for server-side connections (keeps the [`UdpServer`]'s master loop alive past the /// server handle being dropped); `None` for client-side connections (the ephemeral /// `connect()`ed socket lives inside the [`PeerSocket`] and needs no external task). _master_task: Option>, + /// `Some` when [`UdpOpts::cover_traffic_enabled`] was set at construction; `Drop` aborts the + /// task so dropping the connection does not leak it. `None` keeps the old wire-silent behaviour. + _cover_task: Option, +} + +/// RAII guard that aborts the cover-traffic task on drop. Wrapping the `JoinHandle` keeps the +/// `Drop` impl trivial and avoids the temptation to leak it. +struct CoverTaskGuard(tokio::task::JoinHandle<()>); +impl Drop for CoverTaskGuard { + fn drop(&mut self) { + self.0.abort(); + } } impl UdpConnection { @@ -728,13 +946,29 @@ impl UdpConnection { opts: UdpOpts, master_task: Option>, ) -> Self { + let sender = Arc::new(Mutex::new(est.sender)); + // Seed the idle clock to *now* so the cover task's first attempt waits a full interval — + // we don't want a cover Ping firing on the same millisecond the connection establishes. + let last_send_ms = Arc::new(AtomicU64::new(unix_ms())); + let cover_task = if opts.cover_traffic_enabled { + Some(CoverTaskGuard(tokio::spawn(cover_traffic_loop( + est.socket.clone(), + sender.clone(), + last_send_ms.clone(), + opts, + )))) + } else { + None + }; Self { socket: est.socket, - sender: Mutex::new(est.sender), + sender, receiver: Mutex::new(est.receiver), peer_id: est.peer_id, opts, + last_send_ms, _master_task: master_task, + _cover_task: cover_task, } } @@ -752,6 +986,35 @@ impl UdpConnection { } } +/// Pack an already-sealed AEAD record into one DATA datagram (`0x02 || rec_len(u16) || rec`), +/// applying obfuscation padding to the next bucket of `padding_profile` if `obfuscate` is set. +/// +/// Shared by [`PacketConnection::send_packet`], the Ping/Pong reply branch in +/// [`PacketConnection::recv_packet`], and the cover-traffic loop — they all produce identical +/// on-wire framing. +fn pack_data_datagram(rec: &[u8], obfuscate: bool, padding_profile: u8) -> Vec { + let rec_len = rec.len(); + debug_assert!( + rec_len <= u16::MAX as usize, + "sealed record exceeds u16 len" + ); + let mut dg = Vec::with_capacity(DATA_PREFIX_LEN + rec_len); + dg.push(TYPE_DATA); + dg.extend_from_slice(&(rec_len as u16).to_be_bytes()); + dg.extend_from_slice(rec); + if obfuscate { + let target = padding::next_bucket_for_profile(dg.len(), padding_profile); + if target > dg.len() { + let pad = target - dg.len(); + let mut pad_bytes = vec![0u8; pad]; + use rand::RngCore; + rand::thread_rng().fill_bytes(&mut pad_bytes); + dg.extend_from_slice(&pad_bytes); + } + } + dg +} + #[async_trait] impl PacketConnection for UdpConnection { async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { @@ -762,32 +1025,10 @@ impl PacketConnection for UdpConnection { payload: Bytes::copy_from_slice(packet), }) }; - let rec_len = rec.len(); - debug_assert!( - rec_len <= u16::MAX as usize, - "sealed record exceeds u16 len" - ); - - let mut dg = Vec::with_capacity(DATA_PREFIX_LEN + rec_len); - dg.push(TYPE_DATA); - dg.extend_from_slice(&(rec_len as u16).to_be_bytes()); - dg.extend_from_slice(&rec); - - if self.opts.obfuscate { - // Pad the *whole datagram* up to the next size bucket of the configured padding - // profile (the daily mask picks the profile id). The receiver reads exactly `rec_len` - // of the sealed record and ignores the trailing pad bytes. - let target = padding::next_bucket_for_profile(dg.len(), self.opts.padding_profile); - if target > dg.len() { - let pad = target - dg.len(); - let mut pad_bytes = vec![0u8; pad]; - use rand::RngCore; - rand::thread_rng().fill_bytes(&mut pad_bytes); - dg.extend_from_slice(&pad_bytes); - } - } - + let dg = pack_data_datagram(&rec, self.opts.obfuscate, self.opts.padding_profile); self.socket.send_dgram(&dg).await?; + // Mark the link as non-idle so the cover-traffic loop skips its next attempt. + self.last_send_ms.store(unix_ms(), Ordering::Relaxed); Ok(()) } @@ -824,11 +1065,14 @@ impl PacketConnection for UdpConnection { let mut tx = self.sender.lock().await; tx.seal(&Frame::Pong { seq }) }; - let mut out = Vec::with_capacity(DATA_PREFIX_LEN + rec.len()); - out.push(TYPE_DATA); - out.extend_from_slice(&(rec.len() as u16).to_be_bytes()); - out.extend_from_slice(&rec); + let out = pack_data_datagram( + &rec, + self.opts.obfuscate, + self.opts.padding_profile, + ); self.socket.send_dgram(&out).await?; + // A Pong is just as good as a Data send for cover-traffic suppression. + self.last_send_ms.store(unix_ms(), Ordering::Relaxed); } Frame::Pong { .. } => continue, Frame::Close { code, reason } => { @@ -844,6 +1088,61 @@ impl PacketConnection for UdpConnection { } } +/// Background task body: emit encrypted [`Frame::Ping`] chaff during idle periods so the on-wire +/// byte rate stays roughly constant, masking user activity (typing, voice, idle). +/// +/// One iteration: +/// 1. Sample a uniform delay in `mean * (1 ± jitter)` (clamped to ≥ 1 ms) and sleep that long. +/// 2. If we sent anything in the last `delay_ms` (the link was not idle), skip — user traffic +/// suppresses chaff one-for-one. +/// 3. Otherwise seal one `Frame::Ping { seq = random }` and ship it as a DATA datagram. The peer's +/// `recv_packet` answers with a Pong, which our `recv_packet` then drops on the floor — fully +/// invisible to the application layer. +/// +/// The receiver-side cover work for the Pong reply happens on the **peer's** existing `recv_packet` +/// loop, not here — so this task spawns only an outbound writer; no extra reader is needed. +async fn cover_traffic_loop( + socket: Arc, + sender: Arc>, + last_send_ms: Arc, + opts: UdpOpts, +) { + use rand::Rng; + // Defensive clamp: a misconfigured caller setting `mean = 0` would spin tight. + let mean = opts.cover_mean_interval_ms.max(1) as f64; + let j = opts.cover_jitter.clamp(0.0, 0.999) as f64; + loop { + // Uniform delay in [mean*(1-j), mean*(1+j)], floored at 1 ms. + let r: f64 = rand::thread_rng().gen_range(-1.0..=1.0); + let delay_ms = ((mean * (1.0 + r * j)).max(1.0)) as u64; + tokio::time::sleep(Duration::from_millis(delay_ms)).await; + + // Idle check: if any DATA datagram was emitted within the last `delay_ms`, the link is busy + // and chaff would just add overhead. Skip this round. + let now_ms = unix_ms(); + let last = last_send_ms.load(Ordering::Relaxed); + if now_ms.saturating_sub(last) < delay_ms { + continue; + } + + // Seal one Ping with a random seq and pack it as a DATA datagram. + let rec = { + let mut tx = sender.lock().await; + let seq: u32 = rand::thread_rng().gen(); + tx.seal(&Frame::Ping { seq }) + }; + let dg = pack_data_datagram(&rec, opts.obfuscate, opts.padding_profile); + if let Err(e) = socket.send_dgram(&dg).await { + // A transient send failure (e.g. UnreachableHost during reconfig) is best-effort; + // log and keep trying. A permanent failure will be surfaced by the real send path. + tracing::debug!("cover-traffic send failed: {e}"); + continue; + } + // Treat the cover send as "we sent something" so back-to-back ticks do not bunch up. + last_send_ms.store(now_ms, Ordering::Relaxed); + } +} + /// Per-peer inbox capacity in the server's master loop demuxer. /// /// 128 datagrams is comfortably more than a single handshake flight (a handful of messages) @@ -1007,9 +1306,27 @@ async fn server_master_loop( }; let dg = buf[..n].to_vec(); - // Existing peer (handshake-in-progress OR established): hand it to that peer's inbox. + // Cheap RwLock read per datagram so a runtime rotation of the knock key/flag takes effect + // for new traffic immediately. + let opts_now = *opts.read().await; + let knock_active = opts_now.knock_required && opts_now.knock_key.is_some(); + + // Existing peer (handshake-in-progress OR established): hand it to that peer's inbox, + // stripping the knock prefix on HS datagrams when knocking is on (the peer's adapter expects + // the plain `0x01 || ...` wire layout). DATA datagrams (`0x02`) and stray bytes are passed + // through unchanged so already-established connections keep working without the prefix. if let Some(tx) = peers.get(&from) { - match tx.try_send(dg) { + let routed = if knock_active { + strip_knock_for_known_peer(&dg) + } else { + Some(dg) + }; + let Some(routed) = routed else { + // Malformed-when-knock-required (no `0x01` after stripping the 16-byte prefix and + // not a DATA datagram): silently drop, same as for unknown peers. + continue; + }; + match tx.try_send(routed) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { tracing::warn!("udp inbox full for {from}, dropping datagram"); @@ -1023,22 +1340,38 @@ async fn server_master_loop( continue; } - // Unknown source: only a leading HS byte is allowed to spawn a fresh peer. Late stray - // data datagrams from sources we forgot are silently dropped. - if dg.is_empty() || dg[0] != TYPE_HS { + // Unknown source: only a leading HS byte (after optional knock stripping) may spawn a fresh + // peer. Late stray data datagrams from sources we forgot are silently dropped. + let first_hs_dg = if knock_active { + // `unwrap()` is safe under `knock_active` (it's set only when the key is `Some`). + let key = opts_now.knock_key.expect("knock_active implies a key"); + match validate_and_strip_knock(&dg, &key) { + Some(stripped) => stripped, + None => { + // Silently drop — a probe never gets a reply or even a log at info level. UDP + // looks closed to scanners. Keep one debug line for legitimate operators. + tracing::debug!("udp port-knock failed from {from}; dropping (probe?)"); + continue; + } + } + } else if dg.is_empty() || dg[0] != TYPE_HS { continue; - } + } else { + dg + }; - // Register the peer and pre-load the inbox with its first datagram so the spawned - // handshake task picks it up on its first `recv_dgram`. + // Register the peer and pre-load the inbox with its first (post-knock-strip) datagram so + // the spawned handshake task picks it up on its first `recv_dgram`. let (inbox_tx, inbox_rx) = mpsc::channel::>(PEER_INBOX_CAPACITY); // Capacity > 0, so this `try_send` cannot fail; ignore the result defensively. - let _ = inbox_tx.try_send(dg); + let _ = inbox_tx.try_send(first_hs_dg); peers.insert(from, inbox_tx); // Snapshot opts for this peer's lifetime so a concurrent rotation does not change wire - // behaviour mid-handshake (matches the single-peer impl's contract). - let opts_snap = *opts.read().await; + // behaviour mid-handshake (matches the single-peer impl's contract). We already snapshotted + // at the top of the loop iteration for the knock check; reuse that exact value so the + // routing decision and the spawned task agree. + let opts_snap = opts_now; let cfg = proto_cfg.clone(); let master_for_peer = master.clone(); let acc = accept_tx.clone(); @@ -1052,12 +1385,19 @@ async fn server_master_loop( }, }); let state = Arc::new(Mutex::new(HsState::new())); - let result = - run_reliable_handshake(peer_socket, state, opts_snap, move |r, w| async move { + // Server never knock-prefixes its outgoing HS datagrams (only the client does — see the + // `Probe resistance` docs on `UdpOpts`). Pass `None` regardless of `opts_snap.knock_*`. + let result = run_reliable_handshake( + peer_socket, + state, + opts_snap, + None, + move |r, w| async move { let session = server_handshake(r, w, &cfg).await?; Ok(session.into_datagram_parts()) - }) - .await; + }, + ) + .await; match result { Ok(est) => { // Pin the master task alive while this connection lives: upgrading `Weak` @@ -1123,10 +1463,23 @@ impl UdpClient { // Fresh (unseeded) state: the client speaks first (ClientHello). let state = Arc::new(Mutex::new(HsState::new())); - let est = run_reliable_handshake(peer_socket, state, opts, move |r, w| async move { - let session = client_handshake(r, w, &proto_cfg).await?; - Ok(session.into_datagram_parts()) - }) + // Client knocks if (and only if) BOTH `knock_required` is set AND a key was supplied; this + // matches the server's accept policy: missing key on either side ⇒ knocking effectively off. + let knock_key = if opts.knock_required { + opts.knock_key + } else { + None + }; + let est = run_reliable_handshake( + peer_socket, + state, + opts, + knock_key, + move |r, w| async move { + let session = client_handshake(r, w, &proto_cfg).await?; + Ok(session.into_datagram_parts()) + }, + ) .await?; // Client side has no master loop to keep alive — the ephemeral connected socket lives in @@ -1269,4 +1622,144 @@ mod tests { let msg_b: Vec = st.out_partial.drain(..total).collect(); assert_eq!(msg_b, b); } + + // -- Port-knocking helpers ----------------------------------------------------------------- + + /// A constant 32-byte key shared by the unit tests below. + fn test_key() -> [u8; 32] { + let mut k = [0u8; 32]; + for (i, b) in k.iter_mut().enumerate() { + *b = i as u8; + } + k + } + + /// Build a knocked HS datagram for an arbitrary minute, with a trivial trailing payload. The + /// test cares only about the prefix-validation logic, not the wrapped HS message. + fn make_knocked_hs(key: &[u8; 32], minute: u64) -> Vec { + let token = knock_for_minute(key, minute); + let mut dg = Vec::with_capacity(KNOCK_LEN + HS_PREFIX_LEN + 8); + dg.extend_from_slice(&token); + dg.push(TYPE_HS); + dg.extend_from_slice(&0u16.to_be_bytes()); // hs_seq = 0 + dg.extend_from_slice(&ACK_NONE.to_be_bytes()); // ack = none + dg.extend_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]); + dg + } + + #[test] + fn knock_for_minute_is_deterministic_and_minute_sensitive() { + let k = test_key(); + // Same input → same output. + assert_eq!( + knock_for_minute(&k, 1_000_000), + knock_for_minute(&k, 1_000_000) + ); + // Different minute → different output. + assert_ne!( + knock_for_minute(&k, 1_000_000), + knock_for_minute(&k, 1_000_001) + ); + // Different key → different output. + let mut k2 = k; + k2[0] ^= 1; + assert_ne!( + knock_for_minute(&k, 1_000_000), + knock_for_minute(&k2, 1_000_000) + ); + } + + #[test] + fn udp_knock_tolerates_clock_skew() { + // Cover the spec test name: a datagram knocked for `now-1` / `now+1` must still validate at + // the server, but `now-2` / `now+2` must NOT (window is ±1 minute). + let key = test_key(); + let now = current_unix_minute(); + + for minute in [now, now.saturating_sub(1), now.saturating_add(1)] { + let dg = make_knocked_hs(&key, minute); + let stripped = validate_and_strip_knock(&dg, &key).unwrap_or_else(|| { + panic!("expected validation pass for minute {minute} (now={now})") + }); + assert_eq!( + stripped[0], TYPE_HS, + "first byte after strip must be the HS type", + ); + // The stripped tail is exactly the original datagram minus the 16-byte prefix. + assert_eq!(stripped, &dg[KNOCK_LEN..]); + } + + // Two minutes away (in either direction) must fail. + for minute in [now.saturating_sub(2), now.saturating_add(2)] { + let dg = make_knocked_hs(&key, minute); + assert!( + validate_and_strip_knock(&dg, &key).is_none(), + "minute {minute} (now={now}) should fall outside the ±1 acceptance window", + ); + } + + // Garbage prefix never validates. + let mut bad = make_knocked_hs(&key, now); + bad[0] ^= 0xFF; + assert!( + validate_and_strip_knock(&bad, &key).is_none(), + "tampered knock must fail" + ); + + // Wrong layout: missing `0x01` after the 16 bytes — must fail (and not panic). + let mut short = vec![0u8; KNOCK_LEN]; // 16 zero bytes + short.push(0xAA); // not TYPE_HS + assert!(validate_and_strip_knock(&short, &key).is_none()); + // Too short overall: must fail without panic. + let tiny = vec![0u8; KNOCK_LEN - 1]; + assert!(validate_and_strip_knock(&tiny, &key).is_none()); + } + + #[test] + fn known_peer_strip_handles_data_and_hs_paths() { + // DATA datagrams are passed through unchanged (no knock prefix on the data path). + let data = vec![TYPE_DATA, 0x00, 0x05, 1, 2, 3, 4, 5]; + assert_eq!(strip_knock_for_known_peer(&data), Some(data.clone())); + + // HS with a 16-byte (any-bytes) prefix is stripped without validation. + let mut hs = vec![0xCDu8; KNOCK_LEN]; + hs.push(TYPE_HS); + hs.extend_from_slice(&[0, 0, 0xFF, 0xFF, 9, 9, 9]); + let stripped = strip_knock_for_known_peer(&hs).expect("known-peer strip succeeds"); + assert_eq!(stripped[0], TYPE_HS); + assert_eq!(stripped, hs[KNOCK_LEN..]); + + // Empty: dropped. + assert!(strip_knock_for_known_peer(&[]).is_none()); + + // Junk: dropped. + let junk = vec![0xFFu8; 32]; + assert!(strip_knock_for_known_peer(&junk).is_none()); + } + + // -- Cover-traffic packing ------------------------------------------------------------------ + + #[test] + fn pack_data_datagram_layout_no_obfuscate() { + let rec = [1u8, 2, 3, 4, 5]; + let dg = pack_data_datagram(&rec, false, 0); + assert_eq!(dg[0], TYPE_DATA); + assert_eq!(u16::from_be_bytes([dg[1], dg[2]]) as usize, rec.len()); + assert_eq!(&dg[DATA_PREFIX_LEN..], &rec); + // No padding when obfuscate is off. + assert_eq!(dg.len(), DATA_PREFIX_LEN + rec.len()); + } + + #[test] + fn pack_data_datagram_pads_when_obfuscate_set() { + let rec = [0u8; 10]; + let dg = pack_data_datagram(&rec, true, 0); + // Padded up to at least the next bucket; the canonical buckets start above 10 + 3 = 13. + assert!( + dg.len() >= DATA_PREFIX_LEN + rec.len(), + "padded datagram is at least the minimum encoded length", + ); + // Header is still correct (rec_len is unchanged, padding is appended). + assert_eq!(u16::from_be_bytes([dg[1], dg[2]]) as usize, rec.len()); + } } diff --git a/crates/aura-transport/tests/udp_cover.rs b/crates/aura-transport/tests/udp_cover.rs new file mode 100644 index 0000000..ec42c11 --- /dev/null +++ b/crates/aura-transport/tests/udp_cover.rs @@ -0,0 +1,330 @@ +//! Integration tests for the UDP **cover-traffic** (idle-chaff) feature. +//! +//! These exercise the end-to-end behaviour of [`UdpOpts::cover_traffic_enabled`] / +//! [`UdpOpts::cover_mean_interval_ms`] / [`UdpOpts::cover_jitter`]: +//! +//! * [`udp_cover_traffic_fires_on_idle`] — cover enabled on the client; with no user data, the +//! server's `recv_packet` returns at least one Pong-flavoured artefact (a Pong sealed by the +//! server in response to the client's cover Ping is what the client would have seen if it called +//! `recv_packet`). We instead drive a known-payload round trip *after* an idle window to assert +//! that things keep working even when the cover task is running. +//! * [`udp_cover_traffic_skipped_under_load`] — drive 50 packets in ≈1 second; the cover task must +//! skip every attempt (link is non-idle), so the total number of datagrams the server observes is +//! ≈ 50, not noticeably more. +//! * [`udp_cover_traffic_disabled_back_compat`] — defaults give exactly the pre-feature wire +//! silence: no extra Pings, no extra Pongs after a 1 s idle window. + +use std::net::SocketAddr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use aura_pki::AuraCa; +use aura_proto::{ClientConfig, PacketConnection, ServerConfig}; +use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer}; +use tokio::net::UdpSocket; + +const SERVER_NAME: &str = "localhost"; +const CLIENT_ID: &str = "client-cover"; + +fn make_configs() -> (ServerConfig, ClientConfig) { + let ca = AuraCa::generate("Aura UDP Cover Test CA").expect("generate CA"); + let server_cert = ca + .issue_server_cert(SERVER_NAME) + .expect("issue server cert"); + let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert"); + let ca_pem = ca.ca_cert_pem(); + let server_cfg = ServerConfig { + ca_cert_pem: ca_pem.clone(), + server_cert_pem: server_cert.cert_pem, + server_key_pem: server_cert.key_pem, + }; + let client_cfg = ClientConfig { + ca_cert_pem: ca_pem, + client_cert_pem: client_cert.cert_pem, + client_key_pem: client_cert.key_pem, + server_name: SERVER_NAME.to_string(), + }; + (server_cfg, client_cfg) +} + +/// A datagram-counting forwarder that relays UDP between a "front" (clients connect here) and a +/// "back" (real server), counting each direction. We use this to observe wire-level traffic +/// directly — cover-Ping → cover-Pong both pass through, so the counters reflect total chaff. +async fn spawn_counting_forwarder( + server_addr: SocketAddr, +) -> (SocketAddr, Arc, Arc) { + let front = UdpSocket::bind("127.0.0.1:0").await.expect("bind front"); + let back = UdpSocket::bind("127.0.0.1:0").await.expect("bind back"); + let front_addr = front.local_addr().expect("front addr"); + let front = Arc::new(front); + let back = Arc::new(back); + let c2s = Arc::new(AtomicU64::new(0)); + let s2c = Arc::new(AtomicU64::new(0)); + let client_addr: Arc>> = + Arc::new(tokio::sync::Mutex::new(None)); + + // Client -> Server. + { + let front = front.clone(); + let back = back.clone(); + let c2s = c2s.clone(); + let client_addr = client_addr.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + let (n, from) = match front.recv_from(&mut buf).await { + Ok(v) => v, + Err(_) => continue, + }; + { + let mut ca = client_addr.lock().await; + if ca.is_none() { + *ca = Some(from); + } + } + c2s.fetch_add(1, Ordering::Relaxed); + let _ = back.send_to(&buf[..n], server_addr).await; + } + }); + } + // Server -> Client. + { + let front = front.clone(); + let back = back.clone(); + let s2c = s2c.clone(); + let client_addr = client_addr.clone(); + tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + let (n, _) = match back.recv_from(&mut buf).await { + Ok(v) => v, + Err(_) => continue, + }; + let dest = { *client_addr.lock().await }; + if let Some(dest) = dest { + s2c.fetch_add(1, Ordering::Relaxed); + let _ = front.send_to(&buf[..n], dest).await; + } + } + }); + } + (front_addr, c2s, s2c) +} + +#[tokio::test] +async fn udp_cover_traffic_fires_on_idle() { + let (server_cfg, client_cfg) = make_configs(); + // Tight cover interval so the test does not have to wait long for chaff to show up. + let opts = UdpOpts { + cover_traffic_enabled: true, + cover_mean_interval_ms: 100, + cover_jitter: 0.2, + ..UdpOpts::default() + }; + + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server"); + let server_addr = server.local_addr().expect("server addr"); + + // Put the counting forwarder between client and server so we can see chaff on the wire. + let (proxy_addr, c2s, s2c) = spawn_counting_forwarder(server_addr).await; + + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); + + let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) + .await + .expect("accept timely") + .expect("accept join") + .expect("server accept"); + let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) + .await + .expect("connect timely") + .expect("connect join") + .expect("client connect"); + + let server_conn: Arc = Arc::new(server_conn); + let client_conn: Arc = Arc::new(client_conn); + + // Snapshot counters after the handshake completes, then go idle for ~1.5 s. + let c2s_after_hs = c2s.load(Ordering::Relaxed); + let s2c_after_hs = s2c.load(Ordering::Relaxed); + + // Spawn a recv loop on the server side so it actually drives its read path (and replies to + // Pings). Without this, the client's cover Pings would sit in the OS socket buffer. + let server_for_recv = server_conn.clone(); + let recv_task = tokio::spawn(async move { + for _ in 0..20 { + // Each call returns on Data; cover Pings are answered internally without ever + // returning. So if we hit a timeout, that's expected: cover traffic does NOT surface + // as data. We're really just running the loop so the server processes Pings. + let _ = tokio::time::timeout(Duration::from_millis(300), server_for_recv.recv_packet()) + .await; + } + }); + + // Mirror on the client side: keep its recv path active so it consumes Pongs (otherwise the + // OS recv buffer fills with Pongs and the test sees them only on the wire counter, but the + // cover-task's own state stays clean). + let client_for_recv = client_conn.clone(); + let recv_task_c = tokio::spawn(async move { + for _ in 0..20 { + let _ = tokio::time::timeout(Duration::from_millis(300), client_for_recv.recv_packet()) + .await; + } + }); + + tokio::time::sleep(Duration::from_millis(1500)).await; + + let c2s_idle = c2s.load(Ordering::Relaxed); + let s2c_idle = s2c.load(Ordering::Relaxed); + let c2s_chaff = c2s_idle.saturating_sub(c2s_after_hs); + let s2c_chaff = s2c_idle.saturating_sub(s2c_after_hs); + + // With cover_mean_interval_ms = 100 over ~1.5 s, we should see > 0 chaff datagrams in each + // direction (client sends Pings; server replies with Pongs). We do not pin a tight bound to + // avoid flake on slow CI; even one chaff datagram per direction proves the feature is firing. + assert!( + c2s_chaff >= 1, + "expected at least one cover-traffic Ping client→server, got {c2s_chaff} (handshake baseline {c2s_after_hs})", + ); + assert!( + s2c_chaff >= 1, + "expected at least one cover-traffic Pong server→client, got {s2c_chaff} (handshake baseline {s2c_after_hs})", + ); + + drop(recv_task); + drop(recv_task_c); +} + +#[tokio::test] +async fn udp_cover_traffic_skipped_under_load() { + let (server_cfg, client_cfg) = make_configs(); + // Cover-task mean interval ~50 ms; user traffic must suppress chaff one-for-one because each + // send updates `last_send_ms`. + let opts = UdpOpts { + cover_traffic_enabled: true, + cover_mean_interval_ms: 50, + cover_jitter: 0.1, + ..UdpOpts::default() + }; + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server"); + let server_addr = server.local_addr().expect("server addr"); + let (proxy_addr, c2s, _s2c) = spawn_counting_forwarder(server_addr).await; + + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); + + let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) + .await + .expect("accept timely") + .expect("accept join") + .expect("server accept"); + let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) + .await + .expect("connect timely") + .expect("connect join") + .expect("client connect"); + + let server_conn: Arc = Arc::new(server_conn); + let client_conn: Arc = Arc::new(client_conn); + + // Drain the server in the background so it actually processes incoming packets (and Pings). + let server_for_recv = server_conn.clone(); + let recv_task = tokio::spawn(async move { + let mut got = 0u32; + while let Ok(Ok(_)) = + tokio::time::timeout(Duration::from_millis(2000), server_for_recv.recv_packet()).await + { + got += 1; + if got >= 50 { + break; + } + } + got + }); + + let c2s_after_hs = c2s.load(Ordering::Relaxed); + + // Drive 50 user packets over ≈1 s. Each send updates last_send_ms, so the cover task's idle + // check (`now - last < delay_ms`) is true on every iteration and chaff is skipped. + let total = 50u32; + let pkt = vec![0xABu8; 100]; + let start = std::time::Instant::now(); + for _ in 0..total { + client_conn.send_packet(&pkt).await.expect("client send"); + // Pace the sends to ~1 packet every 20 ms (50 packets in 1 s); spacing < mean interval + // (50 ms) so the suppression check always wins. + tokio::time::sleep(Duration::from_millis(20)).await; + } + let elapsed = start.elapsed(); + assert!( + elapsed < Duration::from_secs(3), + "loop should finish under 3 s, took {elapsed:?}", + ); + + // Give the server a moment to drain. + let _ = tokio::time::timeout(Duration::from_secs(3), recv_task).await; + + let c2s_data = c2s.load(Ordering::Relaxed).saturating_sub(c2s_after_hs); + // Expect ≈ 50 datagrams from client to server (the user packets). Allow a small slack for one + // straggler cover Ping if a sleep wakes up just slightly late; tightly bound at ≤ 60. + assert!( + c2s_data >= total as u64, + "client must have sent at least {total} user packets, observed {c2s_data}", + ); + assert!( + c2s_data <= (total as u64) + 10, + "cover-traffic should be suppressed under load, but observed {c2s_data} datagrams (expected ≈ {total})", + ); +} + +#[tokio::test] +async fn udp_cover_traffic_disabled_back_compat() { + let (server_cfg, client_cfg) = make_configs(); + let opts = UdpOpts::default(); // cover_traffic_enabled: false + + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server"); + let server_addr = server.local_addr().expect("server addr"); + let (proxy_addr, c2s, s2c) = spawn_counting_forwarder(server_addr).await; + + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); + + let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) + .await + .expect("accept timely") + .expect("accept join") + .expect("server accept"); + let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) + .await + .expect("connect timely") + .expect("connect join") + .expect("client connect"); + + // After the handshake, both sides go fully idle for 1 s. Nothing must hit the wire. + let c2s_after_hs = c2s.load(Ordering::Relaxed); + let s2c_after_hs = s2c.load(Ordering::Relaxed); + + // Hold references so the connections do not drop early. + let _hold_server = server_conn; + let _hold_client = client_conn; + + tokio::time::sleep(Duration::from_secs(1)).await; + assert_eq!( + c2s.load(Ordering::Relaxed), + c2s_after_hs, + "no client→server chaff with cover disabled", + ); + assert_eq!( + s2c.load(Ordering::Relaxed), + s2c_after_hs, + "no server→client chaff with cover disabled", + ); +} diff --git a/crates/aura-transport/tests/udp_knock.rs b/crates/aura-transport/tests/udp_knock.rs new file mode 100644 index 0000000..d699439 --- /dev/null +++ b/crates/aura-transport/tests/udp_knock.rs @@ -0,0 +1,193 @@ +//! Integration tests for the UDP **port-knocking** (probe resistance) feature. +//! +//! These exercise the end-to-end behaviour of [`UdpOpts::knock_required`] / [`UdpOpts::knock_key`]: +//! +//! * [`udp_knock_required_silent_drop_on_missing_or_wrong`] — server requires knocking; client does +//! not knock → server stays silent (no reply within 1 s, so a passive scanner sees a closed port). +//! * [`udp_knock_required_accepts_valid`] — both sides knock with the same key → handshake completes +//! like usual; the inner Aura mutual auth still drives the auth decision. +//! * [`udp_knock_disabled_back_compat`] — both sides disabled → exact pre-feature wire behaviour. +//! +//! The clock-skew tolerance test (±1 minute) lives as a unit test inside `src/udp.rs` so it can +//! drive [`validate_and_strip_knock`] directly with a faked "now" — much sharper than racing the +//! wall clock here. + +use std::sync::Arc; +use std::time::Duration; + +use aura_pki::AuraCa; +use aura_proto::{ClientConfig, PacketConnection, ServerConfig}; +use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer}; +use tokio::net::UdpSocket; + +const SERVER_NAME: &str = "localhost"; +const CLIENT_ID: &str = "client-knock"; + +/// Mint CA + server + client cert/key triples, returning matching handshake configs. +fn make_configs() -> (ServerConfig, ClientConfig) { + let ca = AuraCa::generate("Aura UDP Knock Test CA").expect("generate CA"); + let server_cert = ca + .issue_server_cert(SERVER_NAME) + .expect("issue server cert"); + let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert"); + let ca_pem = ca.ca_cert_pem(); + let server_cfg = ServerConfig { + ca_cert_pem: ca_pem.clone(), + server_cert_pem: server_cert.cert_pem, + server_key_pem: server_cert.key_pem, + }; + let client_cfg = ClientConfig { + ca_cert_pem: ca_pem, + client_cert_pem: client_cert.cert_pem, + client_key_pem: client_cert.key_pem, + server_name: SERVER_NAME.to_string(), + }; + (server_cfg, client_cfg) +} + +/// A 32-byte test knock key; in production this is `SHA-256(CA-cert-DER)` (the CLI computes it), +/// but for the transport-level tests any well-known constant is fine — both sides just need the +/// same bytes. +fn test_knock_key() -> [u8; 32] { + let mut k = [0u8; 32]; + for (i, b) in k.iter_mut().enumerate() { + *b = (i as u8).wrapping_mul(13).wrapping_add(7); + } + k +} + +#[tokio::test] +async fn udp_knock_required_silent_drop_on_missing_or_wrong() { + let (server_cfg, _client_cfg) = make_configs(); + + // Server: require knocking with our test key. Tighten the handshake timeout so the test does + // not have to wait the default 10 s for the (never-arriving) handshake. + let server_opts = UdpOpts { + knock_required: true, + knock_key: Some(test_knock_key()), + hs_timeout: Duration::from_secs(2), + ..UdpOpts::default() + }; + let server = UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, server_opts) + .expect("bind udp server"); + let server_addr = server.local_addr().expect("server local_addr"); + + // Bind a raw client socket and send a single *unknocked* HS-shaped datagram. We do NOT run + // `UdpClient::connect` here because that would inject the proto handshake's ClientHello and we + // want to assert "the server is silent at the wire level". + let raw_client = UdpSocket::bind("127.0.0.1:0") + .await + .expect("bind raw client"); + raw_client.connect(server_addr).await.expect("raw connect"); + + // Wire layout the server expects when knock is OFF: 0x01 (HS) || hs_seq(2) || ack(2) || msg. + // No knock prefix → the server's master loop must drop this silently. + let mut unknocked_hs = vec![0x01u8, 0x00, 0x00, 0xFF, 0xFF]; + // Append some plausible-looking handshake-message bytes so the datagram is non-trivially sized. + unknocked_hs.extend_from_slice(&[0u8; 64]); + + raw_client + .send(&unknocked_hs) + .await + .expect("send unknocked HS"); + + // The server must NOT reply. Wait 1 s for any inbound datagram; recv_from must time out. + let mut buf = [0u8; 1024]; + let res = tokio::time::timeout(Duration::from_secs(1), raw_client.recv(&mut buf)).await; + assert!( + res.is_err(), + "server replied to an unknocked HS datagram (got {} bytes), expected wire silence", + res.unwrap_or(Ok(0)).unwrap_or(0), + ); + + // Cleanup: drop the server explicitly (also tears down the master loop). + drop(server); +} + +#[tokio::test] +async fn udp_knock_required_accepts_valid() { + let (server_cfg, client_cfg) = make_configs(); + let key = test_knock_key(); + + let opts = UdpOpts { + knock_required: true, + knock_key: Some(key), + ..UdpOpts::default() + }; + + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server"); + let server_addr = server.local_addr().expect("server local_addr"); + + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(server_addr, client_cfg, opts).await }); + + let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) + .await + .expect("server accept timely") + .expect("accept join") + .expect("server accept"); + let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) + .await + .expect("client connect timely") + .expect("connect join") + .expect("client connect"); + + assert_eq!( + server_conn.peer_id(), + Some(CLIENT_ID), + "server learned client CN — handshake completed through knocking", + ); + + // Round-trip a packet both ways to prove the data path also works under knocking. + let server_conn: Arc = Arc::new(server_conn); + let client_conn: Arc = Arc::new(client_conn); + client_conn + .send_packet(b"knock knock") + .await + .expect("client send"); + let got = tokio::time::timeout(Duration::from_secs(5), server_conn.recv_packet()) + .await + .expect("server recv timely") + .expect("server recv"); + assert_eq!(got, b"knock knock"); +} + +#[tokio::test] +async fn udp_knock_disabled_back_compat() { + let (server_cfg, client_cfg) = make_configs(); + let opts = UdpOpts::default(); // knock_required: false, knock_key: None + + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server"); + let server_addr = server.local_addr().expect("server local_addr"); + + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(server_addr, client_cfg, opts).await }); + + let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task) + .await + .expect("server accept timely") + .expect("accept join") + .expect("server accept"); + let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task) + .await + .expect("client connect timely") + .expect("connect join") + .expect("client connect"); + + assert_eq!(server_conn.peer_id(), Some(CLIENT_ID)); + let server_conn: Arc = Arc::new(server_conn); + let client_conn: Arc = Arc::new(client_conn); + client_conn + .send_packet(b"no-knock") + .await + .expect("client send"); + let got = tokio::time::timeout(Duration::from_secs(5), server_conn.recv_packet()) + .await + .expect("server recv timely") + .expect("server recv"); + assert_eq!(got, b"no-knock"); +}