Files
AuraVPN/crates/aura-transport/tests/udp_loopback.rs
T
xah30 866b9f427a feat(transport): custom UDP post-quantum transport (own tunneling, no QUIC)
Aura's own data path over plain UDP, authenticated solely by the existing Aura
PQ handshake (hybrid X25519+ML-KEM-768 + mutual X.509) — no QUIC, no outer TLS.

- One UDP socket, two phases by type byte: 0x01 HS (reliable handshake), 0x02
  DATA (datagram records). HS = DTLS-flight reliability over UDP: per-message
  seq, cumulative acks, retransmit (RTO), reorder/dedup, post-handshake linger;
  message boundaries parsed from the 5-byte Aura header. DATA = one explicit-
  nonce AEAD record per datagram (seq||AEAD), replay-checked, optional padding to
  HTTPS size buckets (obfuscation).
- UdpServer/UdpClient/UdpConnection (impl PacketConnection, concurrent send/recv).
  v1: single peer per accept (multi-client demux is a follow-up).
- 5 adapter unit tests + udp loopback end-to-end (obfuscation on, 1300B/empty/
  duplex) + handshake-survives-30%-loss-and-reorder. No new deps. QUIC tests
  preserved. Whole workspace builds; clippy/fmt clean.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 19:10:43 +03:00

343 lines
13 KiB
Rust

//! End-to-end integration tests for the Aura **UDP** transport (`aura_transport::udp`).
//!
//! These prove that aura-crypto + aura-pki + aura-proto + the new UDP backend integrate: we mint a
//! real CA, issue server/client certs, run the Aura mutual-auth handshake over **plain UDP** via the
//! reliable handshake adapter, and then push application packets as unreliable datagrams through the
//! [`PacketConnection`] API.
//!
//! * [`udp_loopback_end_to_end`] — clean loopback: handshake + several packets each direction
//! (including a ~1300-byte packet and an empty packet), with obfuscation enabled to exercise the
//! DATA padding path; asserts payload integrity and the verified `peer_id`.
//! * [`udp_handshake_survives_loss_and_reorder`] — drives the whole handshake (and then data)
//! through an in-process forwarder that **drops ~30% of datagrams and reorders** the survivors,
//! proving the DTLS-flight-style reliability (retransmit + cumulative ack + reorder/dedup) actually
//! recovers a lossy handshake. This is a full lossy-socket harness, not just a unit test of the
//! adapter logic.
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use aura_pki::AuraCa;
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer};
use tokio::net::UdpSocket;
/// DNS name baked into the server cert SAN and verified by the inner Aura handshake.
const SERVER_NAME: &str = "localhost";
/// Verified client identity (the CN the server should learn).
const CLIENT_ID: &str = "client-udp-001";
/// Mint a fresh CA and the matching server + client handshake configs.
fn make_configs() -> (ServerConfig, ClientConfig) {
let ca = AuraCa::generate("Aura UDP Test CA").expect("generate CA");
let server_cert = ca
.issue_server_cert(SERVER_NAME)
.expect("issue server cert");
let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert");
let ca_pem = ca.ca_cert_pem();
let server_cfg = ServerConfig {
ca_cert_pem: ca_pem.clone(),
server_cert_pem: server_cert.cert_pem.clone(),
server_key_pem: server_cert.key_pem.clone(),
};
let client_cfg = ClientConfig {
ca_cert_pem: ca_pem,
client_cert_pem: client_cert.cert_pem,
client_key_pem: client_cert.key_pem,
server_name: SERVER_NAME.to_string(),
};
(server_cfg, client_cfg)
}
/// Exchange packets both directions through an established pair and assert integrity.
async fn exchange_both_ways(
server: &Arc<dyn PacketConnection>,
client: &Arc<dyn PacketConnection>,
) {
// Client -> Server, including a ~1300-byte packet and an empty packet.
let c2s: Vec<Vec<u8>> = vec![
b"hello over udp".to_vec(),
(0..=255u8).collect(),
vec![0x5Au8; 1300], // larger-than-most-buckets payload, single datagram
Vec::new(), // empty packet
b"trailing".to_vec(),
];
for pkt in &c2s {
client.send_packet(pkt).await.expect("client send");
let got = server.recv_packet().await.expect("server recv");
assert_eq!(&got, pkt, "client->server payload mismatch");
}
// Server -> Client.
let s2c: Vec<Vec<u8>> = vec![b"hello back".to_vec(), vec![0xA5u8; 999], b"bye".to_vec()];
for pkt in &s2c {
server.send_packet(pkt).await.expect("server send");
let got = client.recv_packet().await.expect("client recv");
assert_eq!(&got, pkt, "server->client payload mismatch");
}
}
#[tokio::test]
async fn udp_loopback_end_to_end() {
let (server_cfg, client_cfg) = make_configs();
// Obfuscation ON to exercise the DATA padding path end-to-end.
let opts = UdpOpts {
obfuscate: true,
..UdpOpts::default()
};
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server");
let server_addr = server.local_addr().expect("server local_addr");
assert_ne!(server_addr.port(), 0, "OS should assign a real port");
// Run accept + connect concurrently.
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(server_addr, client_cfg, opts).await });
let server_conn: UdpConnection = accept_task
.await
.expect("accept join")
.expect("server accept");
let client_conn: UdpConnection = connect_task
.await
.expect("connect join")
.expect("client connect");
// Mutual auth must have established the verified peer identity (server learns the client CN).
assert_eq!(
server_conn.peer_id(),
Some(CLIENT_ID),
"server should learn the client's verified CN"
);
assert_eq!(
client_conn.peer_id(),
Some(SERVER_NAME),
"client should record the server name"
);
let server_conn: Arc<dyn PacketConnection> = Arc::new(server_conn);
let client_conn: Arc<dyn PacketConnection> = Arc::new(client_conn);
exchange_both_ways(&server_conn, &client_conn).await;
// Concurrent full-duplex: both directions in flight at once.
let s = server_conn.clone();
let c = client_conn.clone();
let dup_server = tokio::spawn(async move {
s.send_packet(b"duplex-from-server").await.unwrap();
s.recv_packet().await.unwrap()
});
let dup_client = tokio::spawn(async move {
c.send_packet(b"duplex-from-client").await.unwrap();
c.recv_packet().await.unwrap()
});
assert_eq!(dup_server.await.unwrap(), b"duplex-from-client");
assert_eq!(dup_client.await.unwrap(), b"duplex-from-server");
}
/// An in-process lossy + reordering UDP forwarder sitting between the client and the real server.
///
/// The client connects to the forwarder's address; the forwarder relays datagrams to the server and
/// back, **dropping `drop_pct`%** of datagrams in each direction and **reordering** survivors by
/// applying a small random delay before re-sending. This exercises the handshake reliability layer
/// (retransmit, cumulative ack, reorder buffer, dedup) over a genuinely unreliable channel.
///
/// Returns the address clients should connect to. The forwarder runs until the process ends (tests
/// are short-lived); it learns the client's address from the first datagram it receives.
async fn spawn_lossy_forwarder(server_addr: SocketAddr, drop_pct: u32) -> SocketAddr {
// `front` faces the client; `back` faces the server.
let front = UdpSocket::bind("127.0.0.1:0").await.expect("bind front");
let back = UdpSocket::bind("127.0.0.1:0").await.expect("bind back");
let front_addr = front.local_addr().expect("front addr");
let front = Arc::new(front);
let back = Arc::new(back);
// Shared latched client address (learned from the first datagram on `front`).
let client_addr: Arc<tokio::sync::Mutex<Option<SocketAddr>>> =
Arc::new(tokio::sync::Mutex::new(None));
// Deterministic-ish PRNG so failures are reproducible-enough without an extra dep. We use a tiny
// xorshift seeded from the addresses; loss/reorder do not need cryptographic randomness.
let seed = (front_addr.port() as u64) << 16 | server_addr.port() as u64 | 0x9E37_79B9;
// Client -> Server direction.
{
let front = front.clone();
let back = back.clone();
let client_addr = client_addr.clone();
let mut rng = SmallRng::new(seed ^ 0xAAAA);
tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
let (n, from) = match front.recv_from(&mut buf).await {
Ok(v) => v,
Err(_) => continue,
};
{
let mut ca = client_addr.lock().await;
if ca.is_none() {
*ca = Some(from);
}
}
let dg = buf[..n].to_vec();
// Drop?
if rng.below(100) < drop_pct {
continue;
}
let back = back.clone();
let delay = rng.below(8) as u64; // 0..8 ms jitter => reordering
tokio::spawn(async move {
if delay > 0 {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
let _ = back.send_to(&dg, server_addr).await;
});
}
});
}
// Server -> Client direction.
{
let front = front.clone();
let back = back.clone();
let client_addr = client_addr.clone();
let mut rng = SmallRng::new(seed ^ 0x5555);
tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
let (n, _from) = match back.recv_from(&mut buf).await {
Ok(v) => v,
Err(_) => continue,
};
let dest = { *client_addr.lock().await };
let Some(dest) = dest else { continue };
let dg = buf[..n].to_vec();
if rng.below(100) < drop_pct {
continue;
}
let front = front.clone();
let delay = rng.below(8) as u64;
tokio::spawn(async move {
if delay > 0 {
tokio::time::sleep(Duration::from_millis(delay)).await;
}
let _ = front.send_to(&dg, dest).await;
});
}
});
}
front_addr
}
/// Tiny xorshift64 PRNG for loss/reorder decisions (no external dependency needed).
struct SmallRng(u64);
impl SmallRng {
fn new(seed: u64) -> Self {
Self(seed | 1)
}
fn next_u64(&mut self) -> u64 {
let mut x = self.0;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
self.0 = x;
x
}
/// A value in `0..bound`.
fn below(&mut self, bound: u32) -> u32 {
(self.next_u64() % bound as u64) as u32
}
}
#[tokio::test]
async fn udp_handshake_survives_loss_and_reorder() {
let (server_cfg, client_cfg) = make_configs();
let opts = UdpOpts::default(); // obfuscation off; default 250ms RTO / 10s deadline
// Real server on loopback.
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server");
let server_addr = server.local_addr().expect("server local_addr");
// Lossy/reordering forwarder in front of it: drops ~30% and jitters survivors.
let proxy_addr = spawn_lossy_forwarder(server_addr, 30).await;
// Accept on the real server; connect the client to the *proxy*.
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await });
let server_conn = server_conn_or_panic(accept_task).await;
let client_conn = client_conn_or_panic(connect_task).await;
assert_eq!(
server_conn.peer_id(),
Some(CLIENT_ID),
"handshake completed through loss/reorder; server learned client CN"
);
// Data must flow through the lossy channel. Because the data path is intentionally unreliable
// (UDP datagrams), we retry each send a few times until one survives the ~30% drop — this checks
// the *data* path works end-to-end, not that it is itself reliable (it is not, by design).
let server_conn: Arc<dyn PacketConnection> = Arc::new(server_conn);
let client_conn: Arc<dyn PacketConnection> = Arc::new(client_conn);
assert!(
deliver_with_retries(&client_conn, &server_conn, b"c2s through loss", 40).await,
"a client->server packet should eventually get through the lossy channel"
);
assert!(
deliver_with_retries(&server_conn, &client_conn, b"s2c through loss", 40).await,
"a server->client packet should eventually get through the lossy channel"
);
}
/// Send `payload` from `tx` repeatedly until `rx` receives exactly it, or `max_tries` is exhausted.
///
/// The data path is unreliable by design, so this retransmits at the application layer. We race each
/// receive against a short timeout so a dropped datagram does not block forever.
async fn deliver_with_retries(
tx: &Arc<dyn PacketConnection>,
rx: &Arc<dyn PacketConnection>,
payload: &[u8],
max_tries: u32,
) -> bool {
for _ in 0..max_tries {
tx.send_packet(payload).await.expect("send");
match tokio::time::timeout(Duration::from_millis(150), rx.recv_packet()).await {
Ok(Ok(got)) if got == payload => return true,
// Either a timeout (dropped) or some other datagram: retry.
_ => continue,
}
}
false
}
async fn server_conn_or_panic(
task: tokio::task::JoinHandle<anyhow::Result<UdpConnection>>,
) -> UdpConnection {
// Bound the whole handshake-through-loss with a generous ceiling so a hung test fails loudly.
tokio::time::timeout(Duration::from_secs(20), task)
.await
.expect("server accept did not finish within 20s (loss/reorder)")
.expect("accept join")
.expect("server accept")
}
async fn client_conn_or_panic(
task: tokio::task::JoinHandle<anyhow::Result<UdpConnection>>,
) -> UdpConnection {
tokio::time::timeout(Duration::from_secs(20), task)
.await
.expect("client connect did not finish within 20s (loss/reorder)")
.expect("connect join")
.expect("client connect")
}