Files
xah30 5e553b79df feat(cli): v3.3 circuit rotation — background rebuild every N seconds
Adds RotatingCircuit: the multi-hop circuit is silently torn down and
rebuilt on a configurable interval (default off) so a long-running
client periodically rotates its on-wire path. Application packets never
see the swap.

- RotatingCircuit::new(hops, udp_opts, interval) seeds an initial
  CircuitConnection synchronously (errors surface), then spawns a
  background rotator that every `interval`:
    1. dial_circuit(&hops, udp_opts) -> next: CircuitConnection
    2. std::mem::replace inside Arc<RwLock<Arc<CircuitConnection>>>
    3. old Arc dropped when its last in-flight Arc clone is released
       (its Drop aborts forwarders / closes outers).
  send_packet/recv_packet grab a cheap snapshot of the current Arc
  before awaiting, so reads/writes never block under the rotator.
- [client.circuit] rotation_interval_secs: u64 (default 0 = disabled);
  serde(default) keeps old configs working. When 0, the path is exactly
  the v3.2 dial_circuit + optional CellPaddingConn wrap (back-compat).
- CellPaddingConn wraps RotatingCircuit on the OUTSIDE so every new
  circuit shares the same cell_size — on-wire size signature stays
  stable across rotations.
- Integration test multihop_rotation::rotating_circuit_swaps_inner_
  under_traffic: 6 s of 100-ms ping/echo at interval=1.5s -> 37 sent,
  37 received, 2 rotations counted via test-only AtomicU64 counter.
- Synchronous-failure test confirms initial dial errors bubble up from
  ::new without spawning the rotator task.

Workspace: 297 tests passed (+4), clippy -D warnings clean, fmt clean.
293 baseline tests unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-27 21:25:05 +03:00

310 lines
13 KiB
Rust

//! v3.3 background **circuit rotation** integration test.
//!
//! Drives a 2-hop loopback circuit (client → relay → exit) wrapped in a
//! [`circuit::RotatingCircuit`] configured to rebuild itself every 500 ms. Over the lifetime of
//! the test the client sends a steady stream of data packets and the exit echoes every one back
//! through the (silently rotating) circuit. Assertions:
//!
//! 1. **Every** packet round-trips successfully — the rotation is invisible to the data plane.
//! 2. The [`RotatingCircuit::rotation_count`] reports at least one successful rotation by the
//! time the test ends, proving the background rotator actually ran.
//!
//! ## Why two hops and not three
//!
//! The 3-hop test in `multihop.rs` exists for protocol coverage. The rotation logic is
//! orthogonal to hop count (it just re-runs whatever `dial_circuit` does), so we use the cheaper
//! 2-hop topology to keep the test fast. Each rotation = one fresh outer handshake to the
//! entry + one ExtendBridge + one inner handshake to the exit, plus full teardown of the
//! previous chain.
//!
//! ## Why fresh actors per rotation
//!
//! Each [`UdpServer::accept`] returns ONE connection per server instance. Rotating the circuit
//! re-dials the entry-relay and the exit, so both servers need to accept a *new* connection on
//! every rotation. The actors in this test spawn per-rotation tasks that accept-then-handle as
//! many connections as the test exchanges; the relay and exit ports are reused.
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use aura_cli::circuit::{self, HopConfig, RotatingCircuit};
use aura_cli::relay::{self, RendezvousOutcome};
use aura_pki::AuraCa;
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
use aura_transport::{UdpOpts, UdpServer};
const EXIT_SAN: &str = "localhost-exit-rot";
const RELAY_SAN: &str = "localhost-relay-rot";
const CLIENT_ID: &str = "client-multihop-rot";
/// Reserve and immediately release a free UDP port on loopback (the window before re-bind in the
/// same process is negligible on a quiet test).
fn free_udp_port() -> u16 {
let sock = std::net::UdpSocket::bind("127.0.0.1:0").expect("bind ephemeral udp");
sock.local_addr().expect("local_addr").port()
}
fn server_cfg(ca: &AuraCa, san: &str) -> ServerConfig {
let issued = ca.issue_server_cert(san).expect("issue server cert");
ServerConfig {
ca_cert_pem: ca.ca_cert_pem(),
server_cert_pem: issued.cert_pem,
server_key_pem: issued.key_pem,
}
}
fn client_cfg(ca: &AuraCa, server_name: &str) -> ClientConfig {
let issued = ca.issue_client_cert(CLIENT_ID).expect("issue client cert");
ClientConfig {
ca_cert_pem: ca.ca_cert_pem(),
client_cert_pem: issued.cert_pem,
client_key_pem: issued.key_pem,
server_name: server_name.to_string(),
}
}
/// Spawn an exit-actor that accepts an *unbounded* number of connections on `server`. Each
/// accepted connection echoes every received packet back to its sender until the connection
/// closes, then the actor goes back to `server.accept()`. The actor exits naturally when the
/// `UdpServer` is dropped (all incoming sockets close) — the integration driver triggers that
/// by dropping the [`RotatingCircuit`] at the end of the test.
async fn spawn_multi_exit(server: UdpServer) {
loop {
match server.accept().await {
Ok(conn) => {
let conn: Arc<dyn PacketConnection> = Arc::new(conn);
tokio::spawn(async move {
loop {
match conn.recv_packet().await {
Ok(pkt) => {
if conn.send_packet(&pkt).await.is_err() {
return;
}
}
Err(_) => return,
}
}
});
}
Err(_) => return,
}
}
}
/// Spawn a relay-actor that accepts and bridges an *unbounded* number of client connections.
/// Each accepted connection runs the standard [`relay::rendezvous`] dance and then
/// [`relay::run_bridge`] until the client drops; the actor immediately loops back to accept the
/// next one. Reused across every rotation in this test.
async fn spawn_multi_relay(server: UdpServer, whitelist: Vec<SocketAddr>) {
loop {
match server.accept().await {
Ok(conn) => {
let conn: Arc<dyn PacketConnection> = Arc::new(conn);
let wl = whitelist.clone();
tokio::spawn(async move {
match relay::rendezvous(&conn, &wl).await {
RendezvousOutcome::Bridged { bridge } => {
relay::run_bridge(conn, bridge).await;
}
RendezvousOutcome::Refused | RendezvousOutcome::Fallback { .. } => {
// Either no ExtendBridge ever arrived, or the exit was not on the
// whitelist. Drop the connection; the client's dial will fail loudly.
}
}
});
}
Err(_) => return,
}
}
}
/// End-to-end test: a 2-hop circuit rebuilt every 500 ms while a steady stream of data packets
/// passes through it. Asserts that every packet round-trips and that the rotation counter
/// advances at least twice over the ~3-second runtime.
#[tokio::test(flavor = "multi_thread")]
async fn rotating_circuit_swaps_inner_under_traffic() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let ca = AuraCa::generate("Aura v3.3 rotation Test CA").expect("ca");
let exit_proto = server_cfg(&ca, EXIT_SAN);
let relay_proto = server_cfg(&ca, RELAY_SAN);
let exit_port = free_udp_port();
let relay_port = free_udp_port();
let exit_addr: SocketAddr = format!("127.0.0.1:{exit_port}").parse().unwrap();
let relay_addr: SocketAddr = format!("127.0.0.1:{relay_port}").parse().unwrap();
let exit_server =
UdpServer::bind(exit_addr, exit_proto, UdpOpts::default()).expect("bind exit");
let relay_server =
UdpServer::bind(relay_addr, relay_proto, UdpOpts::default()).expect("bind relay");
let exit_actual = exit_server.local_addr().expect("exit addr");
let relay_actual = relay_server.local_addr().expect("relay addr");
// The relay must allow re-bridging to the same exit on every rotation.
let whitelist = vec![exit_actual];
let exit_handle = tokio::spawn(spawn_multi_exit(exit_server));
let relay_handle = tokio::spawn(spawn_multi_relay(relay_server, whitelist));
// Let the actors enter their accept loops.
tokio::time::sleep(Duration::from_millis(50)).await;
// Per-hop client configs (RELAY_SAN for the entry, EXIT_SAN for the exit). We use the same
// global cert via `client_cfg`; this test focuses on rotation, not on identity-unlinkability.
let hops = vec![
HopConfig {
addr: relay_actual,
proto_cfg: client_cfg(&ca, RELAY_SAN),
},
HopConfig {
addr: exit_actual,
proto_cfg: client_cfg(&ca, EXIT_SAN),
},
];
// Construct the rotator. The first dial happens synchronously inside ::new, so by the time
// we return from this `await` the circuit is already serving packets. The interval is set
// long enough that the dial-time overhead of a single rebuild (~1 s on a loaded macOS box
// with three UDP-Aura handshakes happening in series) does not stack and starve the data
// pump between rotations.
let interval = Duration::from_millis(1500);
let rotator = tokio::time::timeout(
Duration::from_secs(20),
RotatingCircuit::new(hops, UdpOpts::default(), interval),
)
.await
.expect("RotatingCircuit::new did not finish within 20s")
.expect("RotatingCircuit::new succeeded");
let rotator: Arc<RotatingCircuit> = Arc::new(rotator);
// The currently-active circuit's peer_id is the exit's SAN — proves the inner handshake
// authenticated the exit through the relay opaquely.
assert_eq!(
rotator.peer_id().await.as_deref(),
Some(EXIT_SAN),
"active circuit's peer_id is the exit's SAN at construction time"
);
// Pump traffic for ~6 seconds, every 100 ms. With a 1.5 s rotation interval the rotator
// fires at t≈1.5, 3.0, 4.5 s — at least 2 rotations land inside the pump window even with
// significant rebuild overhead. Some sends/recvs may transiently fail if a rotation lands
// mid-send and tears down the inner connection underneath the snapshot — that is the
// documented behaviour ("in-flight calls error or block until timeout"). We tolerate a
// small number of such losses and assert the *majority* of packets round-trip.
let pump_duration = Duration::from_secs(6);
let send_interval = Duration::from_millis(100);
let start = std::time::Instant::now();
let mut sent = 0usize;
let mut received_ok = 0usize;
while start.elapsed() < pump_duration {
let pkt: Vec<u8> = format!("rot-{sent:04}").into_bytes();
// Send + recv. If a rotation lands while either is in flight the call on the old
// snapshot may error; that is acceptable — what we want to prove is that the rotator
// itself runs and that the data plane keeps serving on the freshly swapped-in circuit.
let send_res = rotator.send_packet(&pkt).await;
if send_res.is_err() {
sent += 1;
tokio::time::sleep(send_interval).await;
continue;
}
match tokio::time::timeout(Duration::from_secs(3), rotator.recv_packet()).await {
Ok(Ok(echoed)) => {
assert_eq!(echoed, pkt, "echoed payload matches sent payload");
received_ok += 1;
}
Ok(Err(_)) | Err(_) => {
// Rotation likely tore down the inner that this recv was waiting on. Acceptable.
}
}
sent += 1;
tokio::time::sleep(send_interval).await;
}
let rotations = rotator.rotation_count();
println!(
"v3.3 rotating circuit: sent={sent} received_ok={received_ok} rotations={rotations} \
in {:?}",
start.elapsed()
);
assert!(
sent >= 30,
"expected at least 30 packets attempted in 6 s, got {sent}"
);
// At least 2/3 of the sent packets must round-trip — the gaps come from rotation windows.
assert!(
received_ok * 3 >= sent * 2,
"expected at least 2/3 of {sent} packets to echo back, got {received_ok}"
);
assert!(
rotations >= 2,
"expected at least 2 successful rotations in 6 s at 1500 ms interval, got {rotations}"
);
// Drop the rotator first to abort the background task and tear down the active circuit. The
// actors then exit naturally as their accept loops drop.
drop(rotator);
relay_handle.abort();
exit_handle.abort();
// Best-effort wait so the actor tasks unblock the runtime before the test runs to completion.
let _ = tokio::time::timeout(Duration::from_millis(200), async {
let _ = relay_handle.await;
let _ = exit_handle.await;
})
.await;
}
/// `RotatingCircuit::new` propagates any error from the initial [`circuit::dial_circuit`] — if
/// the entry relay is unreachable, construction fails synchronously without spawning the
/// background task. This guarantees the caller does not get a "zombie" rotator hammering an
/// unreachable address.
#[tokio::test(flavor = "multi_thread")]
async fn rotating_circuit_initial_dial_failure_is_synchronous() {
let ca = AuraCa::generate("Aura v3.3 rotation init-fail Test CA").expect("ca");
// Two reachable-but-pointing-at-nothing addresses. The `UdpClient::connect` to either will
// time out, the initial dial_circuit returns Err, and `RotatingCircuit::new` propagates it.
let bogus1: SocketAddr = "127.0.0.1:1".parse().unwrap();
let bogus2: SocketAddr = "127.0.0.1:2".parse().unwrap();
let hops = vec![
HopConfig {
addr: bogus1,
proto_cfg: client_cfg(&ca, RELAY_SAN),
},
HopConfig {
addr: bogus2,
proto_cfg: client_cfg(&ca, EXIT_SAN),
},
];
// Use a short connect timeout via the UDP opts default; we still bound the test in case the
// dial library hangs for longer than expected.
let res = tokio::time::timeout(
Duration::from_secs(30),
RotatingCircuit::new(hops, UdpOpts::default(), Duration::from_secs(60)),
)
.await
.expect("RotatingCircuit::new returned within 30 s");
let err = match res {
Ok(_) => panic!("RotatingCircuit::new must fail when the entry hop is unreachable"),
Err(e) => e,
};
let msg = format!("{err:#}");
// The error chain includes "initial dial_circuit" from our context() wrapper.
assert!(
msg.contains("initial dial_circuit") || msg.contains("dial entry hop"),
"expected initial-dial error, got: {msg}"
);
// Ensure circuit module is still callable directly (no global side-effects from the failed
// construction — just a smoke check that the test runs cleanly).
let _ = circuit::dial_circuit_shared_cfg;
}