From 866b9f427a848088dea9b4204375e03d09319698 Mon Sep 17 00:00:00 2001 From: xah30 Date: Mon, 25 May 2026 19:10:43 +0300 Subject: [PATCH] feat(transport): custom UDP post-quantum transport (own tunneling, no QUIC) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Aura's own data path over plain UDP, authenticated solely by the existing Aura PQ handshake (hybrid X25519+ML-KEM-768 + mutual X.509) — no QUIC, no outer TLS. - One UDP socket, two phases by type byte: 0x01 HS (reliable handshake), 0x02 DATA (datagram records). HS = DTLS-flight reliability over UDP: per-message seq, cumulative acks, retransmit (RTO), reorder/dedup, post-handshake linger; message boundaries parsed from the 5-byte Aura header. DATA = one explicit- nonce AEAD record per datagram (seq||AEAD), replay-checked, optional padding to HTTPS size buckets (obfuscation). - UdpServer/UdpClient/UdpConnection (impl PacketConnection, concurrent send/recv). v1: single peer per accept (multi-client demux is a follow-up). - 5 adapter unit tests + udp loopback end-to-end (obfuscation on, 1300B/empty/ duplex) + handshake-survives-30%-loss-and-reorder. No new deps. QUIC tests preserved. Whole workspace builds; clippy/fmt clean. Co-Authored-By: Claude Opus 4.7 --- crates/aura-transport/src/lib.rs | 7 + crates/aura-transport/src/udp.rs | 1086 +++++++++++++++++++ crates/aura-transport/tests/udp_loopback.rs | 342 ++++++ 3 files changed, 1435 insertions(+) create mode 100644 crates/aura-transport/src/udp.rs create mode 100644 crates/aura-transport/tests/udp_loopback.rs diff --git a/crates/aura-transport/src/lib.rs b/crates/aura-transport/src/lib.rs index aafd0ec..e54b057 100644 --- a/crates/aura-transport/src/lib.rs +++ b/crates/aura-transport/src/lib.rs @@ -18,6 +18,11 @@ //! * [`mimicry`] — ALPN/SNI constants and [`mimicry::chrome_quic_transport_config`]. //! * [`padding`] — [`padding::pad_to_https_size`] / [`padding::inject_padding_frames`] traffic shaping. //! * [`conn`] — [`AuraConnection`], the [`aura_proto::PacketConnection`] implementation. +//! * [`udp`] — an alternative backend that carries Aura's *own* protocol over **plain UDP** +//! (no QUIC, no outer TLS): [`UdpServer`] / [`UdpClient`] / [`UdpConnection`]. The Aura PQ +//! handshake runs over a small DTLS-flight-style reliability adapter; application packets then ride +//! as unreliable explicit-nonce AEAD datagrams. This is the security-equivalent of the QUIC path +//! (the inner Aura handshake is the only authentication either way), minus the HTTP/3 disguise. //! //! ## Usage (Wave 4 / CLI) //! ```no_run @@ -62,11 +67,13 @@ pub mod conn; pub mod mimicry; pub mod padding; pub mod quic; +pub mod udp; pub use conn::AuraConnection; pub use mimicry::{alpn_protocols, chrome_quic_transport_config, ALPN_H3, DEFAULT_SNI}; pub use padding::{inject_padding_frames, pad_to_https_size, HTTPS_SIZE_BUCKETS}; pub use quic::{client_endpoint, server_endpoint, AcceptAnyServerCert}; +pub use udp::{UdpClient, UdpConnection, UdpOpts, UdpServer}; // Re-export the inner proto trait so downstream crates (the CLI) can name the connection as // `Arc` without a separate `aura_proto` import. diff --git a/crates/aura-transport/src/udp.rs b/crates/aura-transport/src/udp.rs new file mode 100644 index 0000000..ca3618c --- /dev/null +++ b/crates/aura-transport/src/udp.rs @@ -0,0 +1,1086 @@ +//! Aura's own post-quantum transport over **plain UDP** (project §7, UDP backend). +//! +//! This is an alternative to the QUIC backend ([`crate::quic`] / [`crate::conn`]): the data path is +//! Aura's *own* protocol carried directly over UDP datagrams, authenticated **solely** by the +//! existing Aura PQ handshake ([`aura_proto::client_handshake`] / [`aura_proto::server_handshake`]). +//! There is no QUIC and no outer TLS here — the only security boundary is the inner Aura handshake +//! (hybrid X25519 + ML-KEM-768 with mutual X.509), which is already post-quantum. +//! +//! ## Two phases, one socket +//! +//! A single [`tokio::net::UdpSocket`] carries both phases, distinguished by a 1-byte type prefix: +//! +//! * **Handshake (`0x01` HS).** The Aura proto handshake writes/reads *whole framed messages* over +//! an [`AsyncRead`](tokio::io::AsyncRead) + [`AsyncWrite`](tokio::io::AsyncWrite). UDP is lossy and +//! reorders, so we run the handshake over [`ReliableHsAdapter`], a small DTLS-flight-style +//! reliability layer (sequence numbers, cumulative acks, retransmit, reorder/dedup). See that type +//! for the details. +//! * **Data (`0x02` DATA).** After the handshake we take +//! [`Session::into_datagram_parts`](aura_proto::Session::into_datagram_parts) and ship each +//! application packet as one explicit-nonce AEAD record in one UDP datagram — unreliable, exactly +//! like the underlying network. Loss/reorder is the caller's concern (Aura tunnels IP packets, +//! which tolerate it), and the datagram codec is already replay-checked. +//! +//! ## Wire format +//! +//! Every UDP datagram begins with a 1-byte type: +//! +//! ```text +//! 0x01 HS : 0x01 || hs_seq(u16 BE) || ack_upto(u16 BE) || msg_bytes +//! 0x02 DATA : 0x02 || rec_len(u16 BE) || sealed_record [|| random_padding] +//! ``` +//! +//! For DATA, `sealed_record` is exactly `rec_len` bytes — one +//! [`DatagramSender::seal`](aura_proto::DatagramSender::seal) output. Any trailing bytes are +//! obfuscation padding and are ignored by the receiver (it reads exactly `rec_len`). +//! +//! ## Single peer per accepted connection (v1) +//! +//! [`UdpServer::accept`] handles **one** client per call: it waits for a client's first HS datagram, +//! latches that source address, runs the handshake bound to it, and returns a [`UdpConnection`] +//! dedicated to that peer. A server that wants to serve many clients concurrently on one well-known +//! port would need a demuxing layer (route datagrams to per-peer connections by source address); +//! that is out of scope for v1. The client side always `.connect()`s its ephemeral socket to the +//! server, so it only ever talks to one peer. + +use std::collections::BTreeMap; +use std::io; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; +use std::time::Duration; + +use async_trait::async_trait; +use bytes::Bytes; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::UdpSocket; +use tokio::sync::Mutex; + +use aura_proto::frame::{decode_header, HEADER_LEN}; +use aura_proto::{ + client_handshake, server_handshake, ClientConfig, DatagramReceiver, DatagramSender, Frame, + PacketConnection, ServerConfig, +}; + +use crate::padding; + +// --------------------------------------------------------------------------------------------- +// Wire constants +// --------------------------------------------------------------------------------------------- + +/// Datagram type byte for a reliable-handshake-transport datagram. +const TYPE_HS: u8 = 0x01; +/// Datagram type byte for a post-handshake (encrypted application) data datagram. +const TYPE_DATA: u8 = 0x02; + +/// HS header length after the type byte: `hs_seq(2) || ack_upto(2)`. +const HS_HDR_LEN: usize = 4; +/// Full HS datagram prefix length: the type byte plus [`HS_HDR_LEN`]; an HS datagram shorter than +/// this is truncated and ignored. +const HS_PREFIX_LEN: usize = 1 + HS_HDR_LEN; +/// DATA header length after the type byte: `rec_len(2)`. +const DATA_HDR_LEN: usize = 2; +/// Full DATA datagram prefix length: the type byte plus [`DATA_HDR_LEN`]. +const DATA_PREFIX_LEN: usize = 1 + DATA_HDR_LEN; + +/// Sentinel value of the on-wire `ack_upto` field meaning "I have received nothing yet" (so the peer +/// must not prune anything). Real handshakes use a handful of sequence numbers, never 0xFFFF, so this +/// is unambiguous and lets `ack_upto` otherwise carry the literal "highest contiguous seq received". +const ACK_NONE: u16 = u16::MAX; + +/// Generous upper bound on a single UDP datagram we will receive (largest Aura handshake message is +/// ~1253 bytes; data records are MTU-sized; this leaves slack for headers + obfuscation padding). +const RECV_BUF: usize = 2048; + +// --------------------------------------------------------------------------------------------- +// Options +// --------------------------------------------------------------------------------------------- + +/// Tunables for the UDP transport (handshake reliability timers and obfuscation). +/// +/// [`UdpOpts::default`] is a sensible production default: obfuscation off, a 250 ms retransmit +/// timeout, and a 10 s overall handshake deadline. +#[derive(Clone, Copy, Debug)] +pub struct UdpOpts { + /// When `true`, pad every outgoing DATA datagram up to the next + /// [`padding::HTTPS_SIZE_BUCKETS`] size class with random trailing bytes (the receiver ignores + /// the pad). Adds bandwidth overhead in exchange for a more uniform on-wire size distribution. + pub obfuscate: bool, + /// Handshake retransmit timeout (RTO): every `hs_rto`, all still-unacked HS datagrams are resent. + pub hs_rto: Duration, + /// Overall handshake deadline; if the handshake has not completed within this, it errors. + pub hs_timeout: Duration, + /// How long the post-handshake linger task keeps resending the final flight (so the peer's last + /// flight is not lost) before giving up if no DATA datagram arrives. + pub hs_linger: Duration, +} + +impl Default for UdpOpts { + fn default() -> Self { + Self { + obfuscate: false, + hs_rto: Duration::from_millis(250), + hs_timeout: Duration::from_secs(10), + hs_linger: Duration::from_secs(2), + } + } +} + +// --------------------------------------------------------------------------------------------- +// Peer-bound socket +// --------------------------------------------------------------------------------------------- + +/// A UDP socket bound to a single peer address. +/// +/// The client connects its ephemeral socket to the server, so it can use plain `send`/`recv`. The +/// server shares one listening socket and remembers the accepted client's address, so it uses +/// `send_to(peer)` and filters `recv_from` to that address. This type hides that asymmetry behind a +/// uniform datagram send/recv pair used by both the reliable handshake adapter and the data path. +#[derive(Debug)] +struct PeerSocket { + socket: UdpSocket, + /// `Some(addr)` for the server (it must address the specific client and ignore strangers); + /// `None` for the client (the socket is already `connect()`ed to the server). + peer: Option, +} + +impl PeerSocket { + /// Send one datagram to the bound peer. + async fn send_dgram(&self, buf: &[u8]) -> io::Result<()> { + match self.peer { + Some(addr) => { + self.socket.send_to(buf, addr).await?; + } + None => { + self.socket.send(buf).await?; + } + } + Ok(()) + } + + /// Receive one datagram from the bound peer. + /// + /// For the server, datagrams from a *different* source address are dropped (v1 serves a single + /// peer per connection), so this loops until a datagram from the latched peer arrives. + async fn recv_dgram(&self) -> io::Result> { + let mut buf = vec![0u8; RECV_BUF]; + match self.peer { + Some(expected) => loop { + let (n, from) = self.socket.recv_from(&mut buf).await?; + if from == expected { + buf.truncate(n); + return Ok(buf); + } + // Datagram from an unrelated source: ignore (single-peer connection). + }, + None => { + let n = self.socket.recv(&mut buf).await?; + buf.truncate(n); + Ok(buf) + } + } + } +} + +// --------------------------------------------------------------------------------------------- +// Reliable handshake adapter +// --------------------------------------------------------------------------------------------- + +/// A reliable, ordered byte stream over UDP — **for the handshake phase only**. +/// +/// The Aura proto handshake is written against [`AsyncRead`] + [`AsyncWrite`] and exchanges a small +/// number of whole framed messages (a DTLS-style "flight" model). This adapter makes that work over +/// lossy/reordering UDP with the minimum machinery needed for correctness: +/// +/// * **Write / message boundaries.** Written bytes are buffered. We parse the 5-byte Aura frame +/// header (`msg_type || len(u24) || version`) to learn each message's total size (`5 + len`); once +/// a whole message is buffered we emit exactly one HS datagram carrying the next outgoing `hs_seq`. +/// Boundaries come from the header, never from `flush`, so a writer that does not flush per message +/// still produces one datagram per message. +/// * **Outgoing reliability.** Every sent HS datagram is retained in an ordered map keyed by +/// `hs_seq`. Every [`UdpOpts::hs_rto`] all still-unacked datagrams are retransmitted, until either +/// they are acked or [`UdpOpts::hs_timeout`] elapses (→ error). +/// * **Acks.** Every HS datagram we send carries `ack_upto` = the highest *contiguous* `hs_seq` we +/// have received. On receiving an HS datagram we drop our unacked entries with `hs_seq <= +/// ack_upto`. If we have new data to acknowledge but nothing to send, we emit a bare HS datagram +/// (empty `msg_bytes`) so the peer can prune. +/// * **Incoming ordering.** Received HS payloads are buffered in a map keyed by `hs_seq` and handed +/// to the reader strictly in contiguous order; duplicates are dropped; the contiguous-received +/// counter (used for `ack_upto`) advances as gaps fill. +/// +/// The adapter shares the [`PeerSocket`] via `Arc` so the same socket can carry DATA afterwards. All +/// mutable protocol state lives behind one `Mutex` ([`HsState`]); the async read/write methods lock +/// it briefly. This is not a hot path (a handful of small messages), so simplicity wins over +/// lock-free cleverness. +struct ReliableHsAdapter { + socket: Arc, + state: Arc>, + /// Signalled by `poll_write` when new bytes are buffered, so the driver flushes promptly without + /// busy-polling. + write_notify: Arc, +} + +/// All mutable state of the reliable handshake adapter. +struct HsState { + // --- outgoing --- + /// Buffer of bytes written by the proto layer but not yet framed into a full message. + out_partial: Vec, + /// Next sequence number to assign to an outgoing HS datagram. + next_send_seq: u16, + /// Unacked sent HS datagrams (`hs_seq -> msg_bytes`), ordered for deterministic retransmit. + unacked: BTreeMap>, + + // --- incoming --- + /// Reassembly buffer of received-but-not-yet-delivered HS payloads (`hs_seq -> msg_bytes`). + in_buf: BTreeMap>, + /// Next `hs_seq` we expect to deliver to the reader (everything below is already delivered). + next_deliver_seq: u16, + /// Bytes delivered in order but not yet copied out by a (short) `poll_read` buffer. + ready: Vec, + /// Read cursor into `ready`. + ready_pos: usize, + /// Waker of a reader parked because no bytes were ready; the driver wakes it after pumping new + /// contiguous data into `ready`. + read_waker: Option, +} + +impl HsState { + fn new() -> Self { + Self { + out_partial: Vec::new(), + next_send_seq: 0, + unacked: BTreeMap::new(), + in_buf: BTreeMap::new(), + next_deliver_seq: 0, + ready: Vec::new(), + ready_pos: 0, + read_waker: None, + } + } + + /// The on-wire `ack_upto` value: the highest *contiguous* `hs_seq` received so far, or + /// [`ACK_NONE`] if nothing has been received yet. + /// + /// `next_deliver_seq` is the first not-yet-received contiguous seq, so the highest contiguous + /// received seq is `next_deliver_seq - 1`; before anything is received we send [`ACK_NONE`]. + fn ack_upto(&self) -> u16 { + if self.next_deliver_seq == 0 { + ACK_NONE + } else { + self.next_deliver_seq - 1 + } + } + + /// Drop unacked outgoing entries the peer has acknowledged via the wire `ack_upto` value. + /// + /// [`ACK_NONE`] means "prune nothing"; otherwise every entry with `hs_seq <= ack_upto` is + /// removed (cumulative ack). + fn prune_acked(&mut self, ack_upto: u16) { + if ack_upto == ACK_NONE { + return; + } + // Retain only entries strictly greater than ack_upto (the cumulative ack covers <=). + self.unacked = self.unacked.split_off(&(ack_upto + 1)); + } + + /// Integrate a received HS payload at `seq`, advancing contiguous delivery. + fn accept_incoming(&mut self, seq: u16, msg: Vec) { + if seq < self.next_deliver_seq { + // Already delivered (a retransmit): drop. + return; + } + self.in_buf.entry(seq).or_insert(msg); + // Drain contiguous run starting at next_deliver_seq into `ready`. + let before = self.ready.len(); + while let Some(bytes) = self.in_buf.remove(&self.next_deliver_seq) { + self.ready.extend_from_slice(&bytes); + self.next_deliver_seq = self.next_deliver_seq.wrapping_add(1); + } + // If we just made new bytes available, wake any parked reader so the handshake can proceed. + if self.ready.len() > before { + if let Some(w) = self.read_waker.take() { + w.wake(); + } + } + } +} + +impl ReliableHsAdapter { + fn new( + socket: Arc, + state: Arc>, + write_notify: Arc, + ) -> Self { + Self { + socket, + state, + write_notify, + } + } + + /// Build and send one HS datagram carrying `msg_bytes` at sequence `seq` with the current ack. + async fn send_hs(socket: &PeerSocket, seq: u16, ack_upto: u16, msg_bytes: &[u8]) { + let mut dg = Vec::with_capacity(HS_PREFIX_LEN + msg_bytes.len()); + dg.push(TYPE_HS); + dg.extend_from_slice(&seq.to_be_bytes()); + dg.extend_from_slice(&ack_upto.to_be_bytes()); + dg.extend_from_slice(msg_bytes); + // Best-effort: a dropped datagram is recovered by retransmit, so log-and-ignore send errors. + let _ = socket.send_dgram(&dg).await; + } + + /// Emit any whole messages currently buffered in `out_partial` as new HS datagrams. + async fn flush_outgoing(&self) { + loop { + let to_send = { + let mut st = self.state.lock().await; + // Need at least a full header to know the message length. + if st.out_partial.len() < HEADER_LEN { + break; + } + let mut hdr = [0u8; HEADER_LEN]; + hdr.copy_from_slice(&st.out_partial[..HEADER_LEN]); + // A malformed header here is a programming error (the proto layer only writes valid + // frames); treat undecodable as "wait for more bytes" to stay safe. + let Ok((_ty, payload_len)) = decode_header(&hdr) else { + break; + }; + let total = HEADER_LEN + payload_len; + if st.out_partial.len() < total { + break; + } + let msg: Vec = st.out_partial.drain(..total).collect(); + let seq = st.next_send_seq; + st.next_send_seq = st.next_send_seq.wrapping_add(1); + let ack = st.ack_upto(); + st.unacked.insert(seq, msg.clone()); + (seq, ack, msg) + }; + Self::send_hs(&self.socket, to_send.0, to_send.1, &to_send.2).await; + } + } + + /// Receive and integrate exactly one HS datagram from the socket. Prunes our retransmit queue by + /// the peer's cumulative ack and delivers any carried message into the reorder buffer. Non-HS + /// datagrams (e.g. a stray DATA) and truncated datagrams are ignored during the handshake. + async fn pump_one_incoming(&self) -> io::Result<()> { + let dg = self.socket.recv_dgram().await?; + if dg.is_empty() || dg[0] != TYPE_HS || dg.len() < HS_PREFIX_LEN { + return Ok(()); + } + let seq = u16::from_be_bytes([dg[1], dg[2]]); + let ack_upto = u16::from_be_bytes([dg[3], dg[4]]); + let msg = dg[HS_PREFIX_LEN..].to_vec(); + + let mut st = self.state.lock().await; + // The peer's cumulative ack prunes our retransmit queue. + st.prune_acked(ack_upto); + // An empty msg is a bare ack datagram (nothing to deliver); otherwise integrate it (which + // wakes a parked reader if it completes a contiguous run). + if !msg.is_empty() { + st.accept_incoming(seq, msg); + } + Ok(()) + } + + /// Send a bare ack datagram (empty message) so the peer can prune its retransmit queue when we + /// have nothing else to send. A bare ack does not consume/advance our outgoing sequence number; + /// it carries the current `next_send_seq` only as a placeholder, which the peer ignores because + /// the message is empty. + async fn send_bare_ack(&self) { + let (seq, ack) = { + let st = self.state.lock().await; + (st.next_send_seq, st.ack_upto()) + }; + Self::send_hs(&self.socket, seq, ack, &[]).await; + } + + /// Retransmit all currently-unacked HS datagrams (called on the RTO timer), each carrying the + /// latest cumulative ack so a retransmit doubles as a fresh ack. + async fn retransmit_unacked(&self) { + let (ack, batch) = { + let st = self.state.lock().await; + let batch: Vec<(u16, Vec)> = + st.unacked.iter().map(|(k, v)| (*k, v.clone())).collect(); + (st.ack_upto(), batch) + }; + for (seq, msg) in batch { + Self::send_hs(&self.socket, seq, ack, &msg).await; + } + } + + /// If we have received at least one contiguous message but currently have nothing queued to send + /// (no partial outgoing message), emit a bare ack so the peer can prune its retransmit queue. + async fn maybe_bare_ack(&self) { + let should = { + let st = self.state.lock().await; + st.next_deliver_seq > 0 && st.out_partial.is_empty() + }; + if should { + self.send_bare_ack().await; + } + } +} + +// `AsyncWrite`: buffer bytes and signal the driver; the actual datagram emission happens in the +// driver's `flush_outgoing` (which parses message boundaries). `poll_write` only appends to +// `out_partial` and notifies, so it never blocks or awaits. We take the state lock with `try_lock` +// (held only for trivial, non-await sections elsewhere) and return Pending+self-wake if momentarily +// contended. +impl AsyncWrite for ReliableHsAdapter { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + // Append to the partial buffer under the lock. The lock is only ever held for trivial, + // non-await critical sections, so `blocking_lock` cannot deadlock the runtime here in + // practice; but to be safe on the async runtime we use `try_lock` and report progress. + match self.state.try_lock() { + Ok(mut st) => { + st.out_partial.extend_from_slice(buf); + drop(st); + // Wake the driver so it frames + emits any now-complete message promptly. + self.write_notify.notify_one(); + Poll::Ready(Ok(buf.len())) + } + Err(_) => { + // Contended (the driver holds it momentarily): ask the caller to retry. + _cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // Datagram emission is driven externally (see `run_handshake`); nothing to force here. + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} + +// `AsyncRead`: hand out already-delivered contiguous bytes from `ready`. If none are ready, register +// the waker and return Pending; the driver task wakes us after pumping new incoming datagrams. +impl AsyncRead for ReliableHsAdapter { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + match self.state.try_lock() { + Ok(mut st) => { + if st.ready_pos < st.ready.len() { + let avail = st.ready.len() - st.ready_pos; + let n = avail.min(buf.remaining()); + let start = st.ready_pos; + buf.put_slice(&st.ready[start..start + n]); + st.ready_pos += n; + // Compact occasionally so `ready` does not grow unbounded across the handshake. + if st.ready_pos == st.ready.len() { + st.ready.clear(); + st.ready_pos = 0; + } + Poll::Ready(Ok(())) + } else { + // Nothing buffered yet; stash our waker so the driver can wake us when a + // contiguous run of incoming HS bytes becomes available. + st.read_waker = Some(cx.waker().clone()); + Poll::Pending + } + } + Err(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } +} + +// --------------------------------------------------------------------------------------------- +// Driving the handshake to completion over the adapter +// --------------------------------------------------------------------------------------------- + +/// Outcome of running the reliable handshake: the established datagram parts plus the shared socket. +struct Established { + sender: DatagramSender, + receiver: DatagramReceiver, + peer_id: Option, + socket: Arc, +} + +/// Run a handshake closure over the reliable adapter, driving I/O concurrently. +/// +/// `run_hs` is either [`client_handshake`] or [`server_handshake`] partially applied with config; it +/// receives the adapter's reader and writer (two handles sharing `state` + `write_notify`) and +/// returns the established [`aura_proto::Session`] reduced to its datagram parts. `state` may be +/// pre-seeded (the server seeds the client's first datagram before calling this). +/// +/// We spawn nothing: the handshake future and the I/O driver are raced with `tokio::select!` in a +/// loop so that (a) outgoing whole messages are framed and flushed to datagrams as soon as +/// `poll_write` signals `write_notify`, (b) incoming datagrams are pumped into the reorder buffer and +/// prune the retransmit queue via their acks, and (c) the RTO timer retransmits unacked flights — +/// all while the handshake future makes progress. The handshake completing wins the race; a brief +/// linger then guards the final flight. An overall deadline bounds the whole thing. +async fn run_reliable_handshake( + socket: Arc, + state: Arc>, + opts: UdpOpts, + run_hs: F, +) -> anyhow::Result +where + F: FnOnce(AdapterRead, AdapterWrite) -> Fut, + Fut: std::future::Future< + Output = Result<(DatagramSender, DatagramReceiver, Option), aura_proto::ProtoError>, + >, +{ + let write_notify = Arc::new(tokio::sync::Notify::new()); + let reader = AdapterRead(ReliableHsAdapter::new( + socket.clone(), + state.clone(), + write_notify.clone(), + )); + let writer = AdapterWrite(ReliableHsAdapter::new( + socket.clone(), + state.clone(), + write_notify.clone(), + )); + let driver = ReliableHsAdapter::new(socket.clone(), state.clone(), write_notify.clone()); + + let hs_fut = run_hs(reader, writer); + tokio::pin!(hs_fut); + + let deadline = tokio::time::sleep(opts.hs_timeout); + tokio::pin!(deadline); + + let mut rto = tokio::time::interval(opts.hs_rto); + rto.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + rto.tick().await; // skip the immediate first tick + + // If `state` was pre-seeded (server case), respond to it immediately rather than waiting for the + // first timer/recv: flush any reply the handshake future already queued and ack the seed. + driver.flush_outgoing().await; + driver.maybe_bare_ack().await; + + loop { + tokio::select! { + // The handshake future: when it completes we are done (after a short linger). + res = &mut hs_fut => { + let (sender, receiver, peer_id) = res?; + // The handshake's *final* message (e.g. the server's Finished) is written via + // `poll_write` as the future resolves, so it is still sitting in `out_partial` and + // was never emitted (this branch won the race against `write_notify`). Flush it now, + // then linger to guard it against loss. + driver.flush_outgoing().await; + linger_retransmit(&driver, opts).await; + return Ok(Established { sender, receiver, peer_id, socket }); + } + // A write was buffered: frame + emit any now-complete outgoing message(s) promptly. + _ = write_notify.notified() => { + driver.flush_outgoing().await; + } + // Pump one incoming datagram, then flush replies and ack so the peer can prune. + r = driver.pump_one_incoming() => { + r?; + driver.flush_outgoing().await; + driver.maybe_bare_ack().await; + } + // RTO: retransmit everything still unacked (also flushes any pending writes first). + _ = rto.tick() => { + driver.flush_outgoing().await; + driver.retransmit_unacked().await; + } + // Overall deadline. + _ = &mut deadline => { + anyhow::bail!("UDP handshake timed out after {:?}", opts.hs_timeout); + } + } + } +} + +/// Briefly resend the last unacked flight after the handshake returns, so a lost final flight is +/// recovered. Stops early if there is nothing unacked. +async fn linger_retransmit(adapter: &ReliableHsAdapter, opts: UdpOpts) { + let rounds = 3u32; + for _ in 0..rounds { + { + let st = adapter.state.lock().await; + if st.unacked.is_empty() { + return; + } + } + adapter.retransmit_unacked().await; + tokio::time::sleep(opts.hs_rto.min(opts.hs_linger / rounds)).await; + } +} + +// --------------------------------------------------------------------------------------------- +// Reader / Writer newtypes given to the proto handshake +// --------------------------------------------------------------------------------------------- + +/// The [`AsyncRead`] half handed to the proto handshake (delegates to the shared adapter state). +struct AdapterRead(ReliableHsAdapter); +/// The [`AsyncWrite`] half handed to the proto handshake (delegates to the shared adapter state). +struct AdapterWrite(ReliableHsAdapter); + +impl AsyncRead for AdapterRead { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl AsyncWrite for AdapterWrite { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} + +// --------------------------------------------------------------------------------------------- +// Public connection / server / client +// --------------------------------------------------------------------------------------------- + +/// An established Aura connection carried over **plain UDP**, exposed as a packet pipe. +/// +/// Implements [`aura_proto::PacketConnection`]: outbound packets are sealed as [`Frame::Data`] on +/// `stream_id 0` and shipped one-per-UDP-datagram; inbound DATA datagrams are opened and their +/// payload returned. `Ping` is answered with `Pong`, stray `Pong` is ignored, and a `Close` frame +/// surfaces as an error. Late handshake retransmits (`0x01` HS datagrams) seen on the data path are +/// dropped. Send and receive use **separate** [`tokio::sync::Mutex`]es, so the two directions run +/// concurrently. +pub struct UdpConnection { + socket: Arc, + sender: Mutex, + receiver: Mutex, + peer_id: Option, + opts: UdpOpts, +} + +impl UdpConnection { + fn from_established(est: Established, opts: UdpOpts) -> Self { + Self { + socket: est.socket, + sender: Mutex::new(est.sender), + receiver: Mutex::new(est.receiver), + peer_id: est.peer_id, + opts, + } + } + + /// The verified identity (Common Name) of the peer learned during the handshake (the server + /// learns the client id; the client learns the server name). + #[must_use] + pub fn peer_id(&self) -> Option<&str> { + self.peer_id.as_deref() + } + + /// Wrap this connection as a trait object for the tunnel/dialer layer. + #[must_use] + pub fn into_dyn(self) -> Arc { + Arc::new(self) + } +} + +#[async_trait] +impl PacketConnection for UdpConnection { + async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { + let rec = { + let mut tx = self.sender.lock().await; + tx.seal(&Frame::Data { + stream_id: 0, + payload: Bytes::copy_from_slice(packet), + }) + }; + let rec_len = rec.len(); + debug_assert!( + rec_len <= u16::MAX as usize, + "sealed record exceeds u16 len" + ); + + let mut dg = Vec::with_capacity(DATA_PREFIX_LEN + rec_len); + dg.push(TYPE_DATA); + dg.extend_from_slice(&(rec_len as u16).to_be_bytes()); + dg.extend_from_slice(&rec); + + if self.opts.obfuscate { + // Pad the *whole datagram* up to the next HTTPS-like size bucket with random bytes. The + // receiver reads exactly `rec_len` of the sealed record and ignores the trailing pad. + let target = padding::next_https_bucket(dg.len()); + if target > dg.len() { + let pad = target - dg.len(); + let mut pad_bytes = vec![0u8; pad]; + use rand::RngCore; + rand::thread_rng().fill_bytes(&mut pad_bytes); + dg.extend_from_slice(&pad_bytes); + } + } + + self.socket.send_dgram(&dg).await?; + Ok(()) + } + + async fn recv_packet(&self) -> anyhow::Result> { + let mut rx = self.receiver.lock().await; + loop { + let dg = self.socket.recv_dgram().await?; + if dg.is_empty() { + continue; + } + match dg[0] { + TYPE_DATA => { + if dg.len() < DATA_PREFIX_LEN { + continue; // truncated header + } + let rec_len = u16::from_be_bytes([dg[1], dg[2]]) as usize; + let start = DATA_PREFIX_LEN; + let end = start + rec_len; + if dg.len() < end { + continue; // truncated record + } + let rec = &dg[start..end]; + let frame = match rx.open(rec) { + Ok(f) => f, + // A replayed/old/corrupt datagram is dropped (defensive: UDP delivers dupes). + Err(_) => continue, + }; + match frame { + Frame::Data { payload, .. } => return Ok(payload.to_vec()), + Frame::Ping { seq } => { + // Answer keep-alive Pings on the same datagram path. + let rec = { + // The send sequence lives in the sender half; lock it briefly. + let mut tx = self.sender.lock().await; + tx.seal(&Frame::Pong { seq }) + }; + let mut out = Vec::with_capacity(DATA_PREFIX_LEN + rec.len()); + out.push(TYPE_DATA); + out.extend_from_slice(&(rec.len() as u16).to_be_bytes()); + out.extend_from_slice(&rec); + self.socket.send_dgram(&out).await?; + } + Frame::Pong { .. } => continue, + Frame::Close { code, reason } => { + anyhow::bail!("peer closed connection (code {code}): {reason}"); + } + } + } + // Late handshake retransmit on the data path: ignore. + TYPE_HS => continue, + _ => continue, // unknown type: ignore + } + } + } +} + +/// An Aura UDP server: a bound UDP socket that accepts one authenticated [`UdpConnection`] per +/// [`accept`](UdpServer::accept). +/// +/// v1 serves a **single peer per accepted connection** (see the module docs). Each `accept` waits +/// for a client's first HS datagram, latches that source address, runs [`server_handshake`] over the +/// reliable adapter, and returns the connection. To serve multiple clients, bind multiple sockets or +/// add a per-source demuxer (out of scope for v1). +pub struct UdpServer { + socket: Arc, + /// A std clone of the same bound socket, kept solely so [`accept`](UdpServer::accept) can safely + /// `try_clone` an independent handle for the per-connection [`PeerSocket`] (no `unsafe`). + std_socket: std::net::UdpSocket, + proto_cfg: Arc, + opts: UdpOpts, +} + +impl UdpServer { + /// Bind a UDP server on `local` (use `127.0.0.1:0` / `0.0.0.0:0` for an OS-assigned port, read + /// back with [`UdpServer::local_addr`]). + /// + /// `proto_cfg` is the inner Aura handshake config (CA + server leaf cert/key) used to mutually + /// authenticate each client. `opts` controls obfuscation and handshake timers. + /// + /// # Errors + /// Returns an [`io::Error`] if the UDP socket cannot bind. + pub fn bind(local: SocketAddr, proto_cfg: ServerConfig, opts: UdpOpts) -> io::Result { + let socket = std::net::UdpSocket::bind(local)?; + socket.set_nonblocking(true)?; + // Keep a safe std clone for per-connection handles; both refer to the same bound port. + let std_socket = socket.try_clone()?; + let socket = UdpSocket::from_std(socket)?; + Ok(Self { + socket: Arc::new(socket), + std_socket, + proto_cfg: Arc::new(proto_cfg), + opts, + }) + } + + /// The local address (including the OS-assigned port) this server is bound to. + /// + /// # Errors + /// Returns an [`io::Error`] if the socket address cannot be read. + pub fn local_addr(&self) -> io::Result { + self.socket.local_addr() + } + + /// Accept the next client: wait for its first HS datagram, then run the Aura mutual-auth + /// handshake bound to that peer over the reliable UDP adapter. + /// + /// Returns a ready [`UdpConnection`] whose [`peer_id`](UdpConnection::peer_id) is the verified + /// client Common Name. + /// + /// # Errors + /// Returns an error if receiving fails or the Aura handshake fails (e.g. the client's + /// certificate does not verify against the CA, or the handshake times out). + pub async fn accept(&self) -> anyhow::Result { + // Wait for the first HS datagram and latch the client's address. We must NOT consume the + // datagram's content blindly: re-deliver it to the handshake by seeding the reorder buffer. + let (peer_addr, first) = loop { + let mut buf = vec![0u8; RECV_BUF]; + let (n, from) = self.socket.recv_from(&mut buf).await?; + buf.truncate(n); + if !buf.is_empty() && buf[0] == TYPE_HS && buf.len() >= HS_PREFIX_LEN { + break (from, buf); + } + // Ignore stray non-HS datagrams while waiting for a fresh client. + }; + + // A peer-bound view over the same bound port: safely `try_clone` the std socket and rebuild + // an independent tokio handle for it. Both the handshake adapter and the data path use this + // handle, addressing the latched client and ignoring any stray sources. + let peer_std = self.std_socket.try_clone()?; + peer_std.set_nonblocking(true)?; + let peer_socket = Arc::new(PeerSocket { + socket: UdpSocket::from_std(peer_std)?, + peer: Some(peer_addr), + }); + + // Seed the reorder buffer with the first datagram so its ClientHello is not lost. + let state = Arc::new(Mutex::new(HsState::new())); + seed_first_hs(&state, &first).await; + + let cfg = self.proto_cfg.clone(); + let opts = self.opts; + let est = run_reliable_handshake(peer_socket, state, opts, move |r, w| async move { + let session = server_handshake(r, w, &cfg).await?; + Ok(session.into_datagram_parts()) + }) + .await?; + + Ok(UdpConnection::from_established(est, opts)) + } +} + +/// An Aura UDP client entry point. +pub struct UdpClient; + +impl UdpClient { + /// Connect to an Aura UDP server at `server`, running the Aura mutual-auth handshake over the + /// reliable UDP adapter on a fresh ephemeral socket connected to the server. + /// + /// `proto_cfg` is the CA + client leaf cert/key + expected server name for the inner handshake. + /// `opts` controls obfuscation and handshake timers. + /// + /// # Errors + /// Returns an error if the socket cannot bind/connect or the Aura handshake fails (bad server + /// cert chain, SAN mismatch, or timeout). + pub async fn connect( + server: SocketAddr, + proto_cfg: ClientConfig, + opts: UdpOpts, + ) -> anyhow::Result { + // Bind an ephemeral local socket matching the server's address family and connect it. + let bind_addr: SocketAddr = if server.is_ipv4() { + "0.0.0.0:0".parse().expect("valid v4 bind addr") + } else { + "[::]:0".parse().expect("valid v6 bind addr") + }; + let std_sock = std::net::UdpSocket::bind(bind_addr)?; + std_sock.set_nonblocking(true)?; + let socket = UdpSocket::from_std(std_sock)?; + socket.connect(server).await?; + + let peer_socket = Arc::new(PeerSocket { + socket, + peer: None, // connected socket: plain send/recv to the server + }); + + // Fresh (unseeded) state: the client speaks first (ClientHello). + let state = Arc::new(Mutex::new(HsState::new())); + let est = run_reliable_handshake(peer_socket, state, opts, move |r, w| async move { + let session = client_handshake(r, w, &proto_cfg).await?; + Ok(session.into_datagram_parts()) + }) + .await?; + + Ok(UdpConnection::from_established(est, opts)) + } +} + +// --------------------------------------------------------------------------------------------- +// Internal helpers for socket sharing and seeding +// --------------------------------------------------------------------------------------------- + +/// Seed an [`HsState`] with the server's first received HS datagram so its message is delivered to +/// the handshake reader in order (its `hs_seq` is 0 for a fresh client). +async fn seed_first_hs(state: &Arc>, dg: &[u8]) { + if dg.len() < HS_PREFIX_LEN || dg[0] != TYPE_HS { + return; + } + let seq = u16::from_be_bytes([dg[1], dg[2]]); + let ack_upto = u16::from_be_bytes([dg[3], dg[4]]); + let msg = dg[HS_PREFIX_LEN..].to_vec(); + let mut st = state.lock().await; + st.prune_acked(ack_upto); + if !msg.is_empty() { + st.accept_incoming(seq, msg); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + /// Drain all currently-ready in-order bytes from the state. + fn drain_ready(st: &mut HsState) -> Vec { + let out = st.ready[st.ready_pos..].to_vec(); + st.ready.clear(); + st.ready_pos = 0; + out + } + + #[test] + fn reorder_buffer_delivers_in_sequence_order() { + let mut st = HsState::new(); + // Deliver seq 2 and 1 before 0: nothing should be ready until 0 arrives. + st.accept_incoming(2, b"ccc".to_vec()); + st.accept_incoming(1, b"bbb".to_vec()); + assert!(drain_ready(&mut st).is_empty(), "no contiguous run yet"); + assert_eq!(st.next_deliver_seq, 0); + + // Now seq 0 fills the gap: the whole contiguous run 0,1,2 is delivered in order. + st.accept_incoming(0, b"aaa".to_vec()); + assert_eq!(drain_ready(&mut st), b"aaabbbccc"); + assert_eq!( + st.next_deliver_seq, 3, + "contiguous counter advanced past all three" + ); + } + + #[test] + fn duplicate_datagrams_are_dropped() { + let mut st = HsState::new(); + st.accept_incoming(0, b"x".to_vec()); + assert_eq!(drain_ready(&mut st), b"x"); + // A retransmit of an already-delivered seq must not be delivered again. + st.accept_incoming(0, b"x".to_vec()); + assert!( + drain_ready(&mut st).is_empty(), + "duplicate of delivered seq dropped" + ); + // A duplicate of a buffered-but-not-yet-delivered seq is also coalesced (no double count). + st.accept_incoming(2, b"z".to_vec()); + st.accept_incoming(2, b"z".to_vec()); + st.accept_incoming(1, b"y".to_vec()); + assert_eq!(drain_ready(&mut st), b"yz"); + assert_eq!(st.next_deliver_seq, 3); + } + + #[test] + fn ack_upto_reports_highest_contiguous_or_sentinel() { + let mut st = HsState::new(); + // Nothing received yet => sentinel. + assert_eq!(st.ack_upto(), ACK_NONE); + st.accept_incoming(0, b"a".to_vec()); + assert_eq!(st.ack_upto(), 0, "highest contiguous is 0"); + // A gap (seq 2 arrives, 1 missing) does not advance the cumulative ack past the gap. + st.accept_incoming(2, b"c".to_vec()); + assert_eq!(st.ack_upto(), 0, "still 0: seq 1 is missing"); + // Filling the gap advances it to the new contiguous high. + st.accept_incoming(1, b"b".to_vec()); + assert_eq!(st.ack_upto(), 2); + } + + #[test] + fn prune_acked_is_cumulative_and_respects_sentinel() { + let mut st = HsState::new(); + st.unacked.insert(0, vec![0]); + st.unacked.insert(1, vec![1]); + st.unacked.insert(2, vec![2]); + + // Sentinel prunes nothing. + st.prune_acked(ACK_NONE); + assert_eq!(st.unacked.len(), 3); + + // ack_upto = 1 removes seq 0 and 1 (cumulative <=), keeps 2. + st.prune_acked(1); + assert_eq!(st.unacked.keys().copied().collect::>(), vec![2]); + + // ack_upto = 2 removes the rest. + st.prune_acked(2); + assert!(st.unacked.is_empty()); + } + + #[test] + fn flush_outgoing_frames_one_datagram_per_whole_message() { + // Emulate the proto layer writing two whole frames back-to-back into out_partial, possibly + // split across writes, and assert flush_outgoing pulls exactly one message at a time. + use aura_proto::frame::{encode_header, MsgType}; + + let mut st = HsState::new(); + // Message A: ClientHello-ish, 4-byte payload. + let mut a = encode_header(MsgType::Data, 4).unwrap().to_vec(); + a.extend_from_slice(&[1, 2, 3, 4]); + // Message B: 2-byte payload. + let mut b = encode_header(MsgType::Data, 2).unwrap().to_vec(); + b.extend_from_slice(&[9, 9]); + + // Write A fully + only part of B's header: only A should be framable. + st.out_partial.extend_from_slice(&a); + st.out_partial.extend_from_slice(&b[..3]); + + // Replicate flush_outgoing's framing logic (the network send is elsewhere/async). + let mut framed: Vec> = Vec::new(); + loop { + if st.out_partial.len() < HEADER_LEN { + break; + } + let mut hdr = [0u8; HEADER_LEN]; + hdr.copy_from_slice(&st.out_partial[..HEADER_LEN]); + let (_ty, plen) = decode_header(&hdr).unwrap(); + let total = HEADER_LEN + plen; + if st.out_partial.len() < total { + break; + } + framed.push(st.out_partial.drain(..total).collect()); + } + assert_eq!( + framed.len(), + 1, + "only the complete message A should be framed" + ); + assert_eq!(framed[0], a); + + // Now append the rest of B: it becomes framable as exactly one more message. + st.out_partial.extend_from_slice(&b[3..]); + let mut hdr = [0u8; HEADER_LEN]; + hdr.copy_from_slice(&st.out_partial[..HEADER_LEN]); + let (_ty, plen) = decode_header(&hdr).unwrap(); + let total = HEADER_LEN + plen; + assert_eq!(st.out_partial.len(), total); + let msg_b: Vec = st.out_partial.drain(..total).collect(); + assert_eq!(msg_b, b); + } +} diff --git a/crates/aura-transport/tests/udp_loopback.rs b/crates/aura-transport/tests/udp_loopback.rs new file mode 100644 index 0000000..e901ccf --- /dev/null +++ b/crates/aura-transport/tests/udp_loopback.rs @@ -0,0 +1,342 @@ +//! End-to-end integration tests for the Aura **UDP** transport (`aura_transport::udp`). +//! +//! These prove that aura-crypto + aura-pki + aura-proto + the new UDP backend integrate: we mint a +//! real CA, issue server/client certs, run the Aura mutual-auth handshake over **plain UDP** via the +//! reliable handshake adapter, and then push application packets as unreliable datagrams through the +//! [`PacketConnection`] API. +//! +//! * [`udp_loopback_end_to_end`] — clean loopback: handshake + several packets each direction +//! (including a ~1300-byte packet and an empty packet), with obfuscation enabled to exercise the +//! DATA padding path; asserts payload integrity and the verified `peer_id`. +//! * [`udp_handshake_survives_loss_and_reorder`] — drives the whole handshake (and then data) +//! through an in-process forwarder that **drops ~30% of datagrams and reorders** the survivors, +//! proving the DTLS-flight-style reliability (retransmit + cumulative ack + reorder/dedup) actually +//! recovers a lossy handshake. This is a full lossy-socket harness, not just a unit test of the +//! adapter logic. + +use std::net::SocketAddr; +use std::sync::Arc; +use std::time::Duration; + +use aura_pki::AuraCa; +use aura_proto::{ClientConfig, PacketConnection, ServerConfig}; +use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer}; + +use tokio::net::UdpSocket; + +/// DNS name baked into the server cert SAN and verified by the inner Aura handshake. +const SERVER_NAME: &str = "localhost"; +/// Verified client identity (the CN the server should learn). +const CLIENT_ID: &str = "client-udp-001"; + +/// Mint a fresh CA and the matching server + client handshake configs. +fn make_configs() -> (ServerConfig, ClientConfig) { + let ca = AuraCa::generate("Aura UDP Test CA").expect("generate CA"); + let server_cert = ca + .issue_server_cert(SERVER_NAME) + .expect("issue server cert"); + let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert"); + let ca_pem = ca.ca_cert_pem(); + + let server_cfg = ServerConfig { + ca_cert_pem: ca_pem.clone(), + server_cert_pem: server_cert.cert_pem.clone(), + server_key_pem: server_cert.key_pem.clone(), + }; + let client_cfg = ClientConfig { + ca_cert_pem: ca_pem, + client_cert_pem: client_cert.cert_pem, + client_key_pem: client_cert.key_pem, + server_name: SERVER_NAME.to_string(), + }; + (server_cfg, client_cfg) +} + +/// Exchange packets both directions through an established pair and assert integrity. +async fn exchange_both_ways( + server: &Arc, + client: &Arc, +) { + // Client -> Server, including a ~1300-byte packet and an empty packet. + let c2s: Vec> = vec![ + b"hello over udp".to_vec(), + (0..=255u8).collect(), + vec![0x5Au8; 1300], // larger-than-most-buckets payload, single datagram + Vec::new(), // empty packet + b"trailing".to_vec(), + ]; + for pkt in &c2s { + client.send_packet(pkt).await.expect("client send"); + let got = server.recv_packet().await.expect("server recv"); + assert_eq!(&got, pkt, "client->server payload mismatch"); + } + + // Server -> Client. + let s2c: Vec> = vec![b"hello back".to_vec(), vec![0xA5u8; 999], b"bye".to_vec()]; + for pkt in &s2c { + server.send_packet(pkt).await.expect("server send"); + let got = client.recv_packet().await.expect("client recv"); + assert_eq!(&got, pkt, "server->client payload mismatch"); + } +} + +#[tokio::test] +async fn udp_loopback_end_to_end() { + let (server_cfg, client_cfg) = make_configs(); + + // Obfuscation ON to exercise the DATA padding path end-to-end. + let opts = UdpOpts { + obfuscate: true, + ..UdpOpts::default() + }; + + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server"); + let server_addr = server.local_addr().expect("server local_addr"); + assert_ne!(server_addr.port(), 0, "OS should assign a real port"); + + // Run accept + connect concurrently. + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(server_addr, client_cfg, opts).await }); + + let server_conn: UdpConnection = accept_task + .await + .expect("accept join") + .expect("server accept"); + let client_conn: UdpConnection = connect_task + .await + .expect("connect join") + .expect("client connect"); + + // Mutual auth must have established the verified peer identity (server learns the client CN). + assert_eq!( + server_conn.peer_id(), + Some(CLIENT_ID), + "server should learn the client's verified CN" + ); + assert_eq!( + client_conn.peer_id(), + Some(SERVER_NAME), + "client should record the server name" + ); + + let server_conn: Arc = Arc::new(server_conn); + let client_conn: Arc = Arc::new(client_conn); + + exchange_both_ways(&server_conn, &client_conn).await; + + // Concurrent full-duplex: both directions in flight at once. + let s = server_conn.clone(); + let c = client_conn.clone(); + let dup_server = tokio::spawn(async move { + s.send_packet(b"duplex-from-server").await.unwrap(); + s.recv_packet().await.unwrap() + }); + let dup_client = tokio::spawn(async move { + c.send_packet(b"duplex-from-client").await.unwrap(); + c.recv_packet().await.unwrap() + }); + assert_eq!(dup_server.await.unwrap(), b"duplex-from-client"); + assert_eq!(dup_client.await.unwrap(), b"duplex-from-server"); +} + +/// An in-process lossy + reordering UDP forwarder sitting between the client and the real server. +/// +/// The client connects to the forwarder's address; the forwarder relays datagrams to the server and +/// back, **dropping `drop_pct`%** of datagrams in each direction and **reordering** survivors by +/// applying a small random delay before re-sending. This exercises the handshake reliability layer +/// (retransmit, cumulative ack, reorder buffer, dedup) over a genuinely unreliable channel. +/// +/// Returns the address clients should connect to. The forwarder runs until the process ends (tests +/// are short-lived); it learns the client's address from the first datagram it receives. +async fn spawn_lossy_forwarder(server_addr: SocketAddr, drop_pct: u32) -> SocketAddr { + // `front` faces the client; `back` faces the server. + let front = UdpSocket::bind("127.0.0.1:0").await.expect("bind front"); + let back = UdpSocket::bind("127.0.0.1:0").await.expect("bind back"); + let front_addr = front.local_addr().expect("front addr"); + let front = Arc::new(front); + let back = Arc::new(back); + + // Shared latched client address (learned from the first datagram on `front`). + let client_addr: Arc>> = + Arc::new(tokio::sync::Mutex::new(None)); + + // Deterministic-ish PRNG so failures are reproducible-enough without an extra dep. We use a tiny + // xorshift seeded from the addresses; loss/reorder do not need cryptographic randomness. + let seed = (front_addr.port() as u64) << 16 | server_addr.port() as u64 | 0x9E37_79B9; + + // Client -> Server direction. + { + let front = front.clone(); + let back = back.clone(); + let client_addr = client_addr.clone(); + let mut rng = SmallRng::new(seed ^ 0xAAAA); + tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + let (n, from) = match front.recv_from(&mut buf).await { + Ok(v) => v, + Err(_) => continue, + }; + { + let mut ca = client_addr.lock().await; + if ca.is_none() { + *ca = Some(from); + } + } + let dg = buf[..n].to_vec(); + // Drop? + if rng.below(100) < drop_pct { + continue; + } + let back = back.clone(); + let delay = rng.below(8) as u64; // 0..8 ms jitter => reordering + tokio::spawn(async move { + if delay > 0 { + tokio::time::sleep(Duration::from_millis(delay)).await; + } + let _ = back.send_to(&dg, server_addr).await; + }); + } + }); + } + + // Server -> Client direction. + { + let front = front.clone(); + let back = back.clone(); + let client_addr = client_addr.clone(); + let mut rng = SmallRng::new(seed ^ 0x5555); + tokio::spawn(async move { + let mut buf = vec![0u8; 4096]; + loop { + let (n, _from) = match back.recv_from(&mut buf).await { + Ok(v) => v, + Err(_) => continue, + }; + let dest = { *client_addr.lock().await }; + let Some(dest) = dest else { continue }; + let dg = buf[..n].to_vec(); + if rng.below(100) < drop_pct { + continue; + } + let front = front.clone(); + let delay = rng.below(8) as u64; + tokio::spawn(async move { + if delay > 0 { + tokio::time::sleep(Duration::from_millis(delay)).await; + } + let _ = front.send_to(&dg, dest).await; + }); + } + }); + } + + front_addr +} + +/// Tiny xorshift64 PRNG for loss/reorder decisions (no external dependency needed). +struct SmallRng(u64); +impl SmallRng { + fn new(seed: u64) -> Self { + Self(seed | 1) + } + fn next_u64(&mut self) -> u64 { + let mut x = self.0; + x ^= x << 13; + x ^= x >> 7; + x ^= x << 17; + self.0 = x; + x + } + /// A value in `0..bound`. + fn below(&mut self, bound: u32) -> u32 { + (self.next_u64() % bound as u64) as u32 + } +} + +#[tokio::test] +async fn udp_handshake_survives_loss_and_reorder() { + let (server_cfg, client_cfg) = make_configs(); + let opts = UdpOpts::default(); // obfuscation off; default 250ms RTO / 10s deadline + + // Real server on loopback. + let server = + UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server"); + let server_addr = server.local_addr().expect("server local_addr"); + + // Lossy/reordering forwarder in front of it: drops ~30% and jitters survivors. + let proxy_addr = spawn_lossy_forwarder(server_addr, 30).await; + + // Accept on the real server; connect the client to the *proxy*. + let accept_task = tokio::spawn(async move { server.accept().await }); + let connect_task = + tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await }); + + let server_conn = server_conn_or_panic(accept_task).await; + let client_conn = client_conn_or_panic(connect_task).await; + + assert_eq!( + server_conn.peer_id(), + Some(CLIENT_ID), + "handshake completed through loss/reorder; server learned client CN" + ); + + // Data must flow through the lossy channel. Because the data path is intentionally unreliable + // (UDP datagrams), we retry each send a few times until one survives the ~30% drop — this checks + // the *data* path works end-to-end, not that it is itself reliable (it is not, by design). + let server_conn: Arc = Arc::new(server_conn); + let client_conn: Arc = Arc::new(client_conn); + + assert!( + deliver_with_retries(&client_conn, &server_conn, b"c2s through loss", 40).await, + "a client->server packet should eventually get through the lossy channel" + ); + assert!( + deliver_with_retries(&server_conn, &client_conn, b"s2c through loss", 40).await, + "a server->client packet should eventually get through the lossy channel" + ); +} + +/// Send `payload` from `tx` repeatedly until `rx` receives exactly it, or `max_tries` is exhausted. +/// +/// The data path is unreliable by design, so this retransmits at the application layer. We race each +/// receive against a short timeout so a dropped datagram does not block forever. +async fn deliver_with_retries( + tx: &Arc, + rx: &Arc, + payload: &[u8], + max_tries: u32, +) -> bool { + for _ in 0..max_tries { + tx.send_packet(payload).await.expect("send"); + match tokio::time::timeout(Duration::from_millis(150), rx.recv_packet()).await { + Ok(Ok(got)) if got == payload => return true, + // Either a timeout (dropped) or some other datagram: retry. + _ => continue, + } + } + false +} + +async fn server_conn_or_panic( + task: tokio::task::JoinHandle>, +) -> UdpConnection { + // Bound the whole handshake-through-loss with a generous ceiling so a hung test fails loudly. + tokio::time::timeout(Duration::from_secs(20), task) + .await + .expect("server accept did not finish within 20s (loss/reorder)") + .expect("accept join") + .expect("server accept") +} + +async fn client_conn_or_panic( + task: tokio::task::JoinHandle>, +) -> UdpConnection { + tokio::time::timeout(Duration::from_secs(20), task) + .await + .expect("client connect did not finish within 20s (loss/reorder)") + .expect("connect join") + .expect("client connect") +}