feat(cli): v3.3 circuit rotation — background rebuild every N seconds
Adds RotatingCircuit: the multi-hop circuit is silently torn down and
rebuilt on a configurable interval (default off) so a long-running
client periodically rotates its on-wire path. Application packets never
see the swap.
- RotatingCircuit::new(hops, udp_opts, interval) seeds an initial
CircuitConnection synchronously (errors surface), then spawns a
background rotator that every `interval`:
1. dial_circuit(&hops, udp_opts) -> next: CircuitConnection
2. std::mem::replace inside Arc<RwLock<Arc<CircuitConnection>>>
3. old Arc dropped when its last in-flight Arc clone is released
(its Drop aborts forwarders / closes outers).
send_packet/recv_packet grab a cheap snapshot of the current Arc
before awaiting, so reads/writes never block under the rotator.
- [client.circuit] rotation_interval_secs: u64 (default 0 = disabled);
serde(default) keeps old configs working. When 0, the path is exactly
the v3.2 dial_circuit + optional CellPaddingConn wrap (back-compat).
- CellPaddingConn wraps RotatingCircuit on the OUTSIDE so every new
circuit shares the same cell_size — on-wire size signature stays
stable across rotations.
- Integration test multihop_rotation::rotating_circuit_swaps_inner_
under_traffic: 6 s of 100-ms ping/echo at interval=1.5s -> 37 sent,
37 received, 2 rotations counted via test-only AtomicU64 counter.
- Synchronous-failure test confirms initial dial errors bubble up from
::new without spawning the rotator task.
Workspace: 297 tests passed (+4), clippy -D warnings clean, fmt clean.
293 baseline tests unchanged.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -39,7 +39,9 @@
|
||||
//! companion mitigation for.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use async_trait::async_trait;
|
||||
@@ -49,6 +51,7 @@ use aura_proto::{
|
||||
};
|
||||
use aura_transport::{UdpClient, UdpConnection, UdpOpts};
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
/// How long the client waits for each hop to reply with [`ControlKind::CircuitReady`] after
|
||||
@@ -419,3 +422,187 @@ pub async fn dial_circuit_with_relay_name(
|
||||
];
|
||||
dial_circuit(&hop_cfgs, udp_opts).await
|
||||
}
|
||||
|
||||
// ---- v3.3: RotatingCircuit ---------------------------------------------------------------------
|
||||
//
|
||||
// Every `interval` seconds the rotator silently rebuilds the entire N-hop circuit from scratch
|
||||
// (new outer handshakes, new ExtendBridge envelopes, a fresh inner handshake to the exit) and
|
||||
// atomically swaps the new [`CircuitConnection`] in for the old one. Any in-flight `send_packet`
|
||||
// / `recv_packet` calls on the previous instance keep running on their own `Arc` clones until
|
||||
// they complete or the OS-level socket dies; new sends/receives after the swap go through the
|
||||
// fresh circuit. The old circuit is dropped — closing every outer connection and aborting every
|
||||
// forwarder task — as soon as the last in-flight `Arc` is released.
|
||||
//
|
||||
// Identity rotation: because `dial_circuit` re-runs the full per-hop handshake every time, every
|
||||
// relay sees a brand-new TLS session (different ephemeral key, fresh AEAD nonces). With per-hop
|
||||
// client certs (v3.2) the certificate CN is also rotated. The exit only knows the client's
|
||||
// stable cert CN; the relay only knows the previous and next IP — neither side can correlate
|
||||
// activity across rotations to a single long-lived flow.
|
||||
|
||||
/// Parameters captured at construction time so the background rotator can rebuild the circuit
|
||||
/// without re-reading the config. Immutable for the lifetime of the rotator.
|
||||
struct RebuildParams {
|
||||
/// Per-hop dial configs. The whole vector is cloned into every [`dial_circuit`] call so
|
||||
/// concurrent rebuild attempts cannot mutate each other's view.
|
||||
hops: Vec<HopConfig>,
|
||||
/// UDP transport options applied to every outer hop's [`aura_transport::UdpClient::connect`].
|
||||
udp_opts: UdpOpts,
|
||||
/// How long to wait between successful rebuilds. Failures do not reset the timer — the next
|
||||
/// tick is `interval` from the previous wakeup, regardless of outcome.
|
||||
interval: Duration,
|
||||
}
|
||||
|
||||
/// A [`PacketConnection`] wrapper that periodically rebuilds the underlying [`CircuitConnection`]
|
||||
/// in the background. Every `send_packet` / `recv_packet` call delegates to the **currently active**
|
||||
/// inner [`CircuitConnection`]; when a rebuild completes, the new circuit atomically replaces the
|
||||
/// old one.
|
||||
///
|
||||
/// ## Lifecycle
|
||||
///
|
||||
/// * [`RotatingCircuit::new`] dials the initial circuit synchronously (so the caller can fail fast
|
||||
/// if the entry hop is unreachable) and then spawns the background rotator.
|
||||
/// * Every `interval` the rotator runs [`dial_circuit`] with the captured [`RebuildParams::hops`].
|
||||
/// On success the new [`CircuitConnection`] replaces the previous one inside the [`RwLock`];
|
||||
/// on failure the previous one is kept and the rotator logs a warning, then waits another
|
||||
/// `interval` before retrying.
|
||||
/// * [`Drop`] aborts the rotator task. The currently-active inner circuit is dropped through the
|
||||
/// `Arc` chain, tearing down its forwarders and outer sockets.
|
||||
///
|
||||
/// ## Cell padding interaction
|
||||
///
|
||||
/// The CLI wires [`RotatingCircuit`] **inside** any [`crate::cells::CellPaddingConn`] — the
|
||||
/// padding layer is applied to the rotator's `Arc<dyn PacketConnection>`, not to each individual
|
||||
/// circuit. This means every rotation produces a circuit that carries cells of the **same**
|
||||
/// `cell_size`, keeping the on-wire signature stable across rotations.
|
||||
pub struct RotatingCircuit {
|
||||
/// The currently-active circuit. Replaced on each successful rebuild.
|
||||
///
|
||||
/// `Arc<...>` so `send_packet` / `recv_packet` can grab a cheap clone, release the read-lock,
|
||||
/// then await on the snapshot — any in-flight call on a *previous* inner does not block the
|
||||
/// rotator's swap.
|
||||
current: Arc<RwLock<Arc<CircuitConnection>>>,
|
||||
/// Captured rebuild parameters. Wrapped in `Arc` so the rotator task can own a clone without
|
||||
/// holding `&self`.
|
||||
_rebuild: Arc<RebuildParams>,
|
||||
/// Number of *successful* rotations completed since construction. Tests use this to assert
|
||||
/// that the background rotator actually ran; production code does not depend on the value.
|
||||
rotation_count: Arc<AtomicU64>,
|
||||
/// Background rotator. Aborted on [`Drop`].
|
||||
rotator_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Drop for RotatingCircuit {
|
||||
fn drop(&mut self) {
|
||||
// Stop the rotator first so it cannot replace `current` mid-drop.
|
||||
self.rotator_task.abort();
|
||||
// `current`'s last `Arc` is released when `self` goes out of scope; that drops the
|
||||
// wrapped `CircuitConnection`, which in turn aborts every forwarder + closes every outer.
|
||||
}
|
||||
}
|
||||
|
||||
impl RotatingCircuit {
|
||||
/// Dial the initial N-hop circuit and start the background rotator.
|
||||
///
|
||||
/// `interval` MUST be greater than zero; the caller is expected to gate construction on a
|
||||
/// non-zero `rotation_interval_secs`. If `dial_circuit` fails synchronously, the error
|
||||
/// propagates and no background task is spawned.
|
||||
///
|
||||
/// # Errors
|
||||
/// * The initial [`dial_circuit`] failed (entry hop unreachable, hop count invalid, etc.).
|
||||
pub async fn new(
|
||||
hops: Vec<HopConfig>,
|
||||
udp_opts: UdpOpts,
|
||||
interval: Duration,
|
||||
) -> anyhow::Result<Self> {
|
||||
let initial = dial_circuit(&hops, udp_opts)
|
||||
.await
|
||||
.context("RotatingCircuit: initial dial_circuit")?;
|
||||
let current = Arc::new(RwLock::new(Arc::new(initial)));
|
||||
let rebuild = Arc::new(RebuildParams {
|
||||
hops,
|
||||
udp_opts,
|
||||
interval,
|
||||
});
|
||||
let rotation_count = Arc::new(AtomicU64::new(0));
|
||||
|
||||
let task_current = Arc::clone(¤t);
|
||||
let task_rebuild = Arc::clone(&rebuild);
|
||||
let task_counter = Arc::clone(&rotation_count);
|
||||
let rotator_task = tokio::spawn(async move {
|
||||
rotator_loop(task_current, task_rebuild, task_counter).await;
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
current,
|
||||
_rebuild: rebuild,
|
||||
rotation_count,
|
||||
rotator_task,
|
||||
})
|
||||
}
|
||||
|
||||
/// Number of successful rotations that have occurred since construction. Test-only helper —
|
||||
/// production code MUST not depend on the exact value because rotations are timer-driven.
|
||||
#[must_use]
|
||||
pub fn rotation_count(&self) -> u64 {
|
||||
self.rotation_count.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// The verified peer Common Name of the **currently-active** inner circuit's exit. This may
|
||||
/// change across rotations only if `hops[N-1].proto_cfg.server_name` was changed — under
|
||||
/// normal operation (immutable `RebuildParams`) it stays the same.
|
||||
pub async fn peer_id(&self) -> Option<String> {
|
||||
let snap = { self.current.read().await.clone() };
|
||||
snap.peer_id().map(str::to_owned)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PacketConnection for RotatingCircuit {
|
||||
async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
|
||||
// Snapshot the current circuit (cheap `Arc` clone) and release the read-lock immediately
|
||||
// so the rotator's `write().await` can replace `current` while this send is in flight.
|
||||
let conn = { self.current.read().await.clone() };
|
||||
conn.send_packet(packet).await
|
||||
}
|
||||
|
||||
async fn recv_packet(&self) -> anyhow::Result<Vec<u8>> {
|
||||
let conn = { self.current.read().await.clone() };
|
||||
conn.recv_packet().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Background rotator: every `interval` rebuild the circuit and atomically swap it in.
|
||||
///
|
||||
/// Failure handling: a failed rebuild leaves the previous circuit in place and the rotator waits
|
||||
/// the full `interval` before retrying. This avoids tight-loop hammering an unreachable entry
|
||||
/// hop (a transient network glitch should not multiply the dial rate).
|
||||
async fn rotator_loop(
|
||||
current: Arc<RwLock<Arc<CircuitConnection>>>,
|
||||
rebuild: Arc<RebuildParams>,
|
||||
rotation_count: Arc<AtomicU64>,
|
||||
) {
|
||||
loop {
|
||||
tokio::time::sleep(rebuild.interval).await;
|
||||
match dial_circuit(&rebuild.hops, rebuild.udp_opts).await {
|
||||
Ok(next) => {
|
||||
let new_arc = Arc::new(next);
|
||||
{
|
||||
let mut slot = current.write().await;
|
||||
// `std::mem::replace` returns the previous `Arc<CircuitConnection>`. It drops
|
||||
// here at the end of this block — if no `send_packet`/`recv_packet` is still
|
||||
// holding a snapshot, the old `CircuitConnection`'s `Drop` runs immediately
|
||||
// (aborting forwarders, closing sockets).
|
||||
let _old = std::mem::replace(&mut *slot, new_arc);
|
||||
}
|
||||
let n = rotation_count.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
tracing::info!(rotation = n, "circuit rotated successfully");
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
error = %e,
|
||||
"circuit rotation failed; keeping previous circuit active until next tick"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,32 +112,59 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> {
|
||||
.build_circuit_hop_configs()
|
||||
.context("building [client.circuit] hop configs")?;
|
||||
let hop_count = hop_cfgs.len();
|
||||
let rotation_secs = cfg.client.circuit.rotation_interval_secs;
|
||||
tracing::info!(
|
||||
hops = hop_count,
|
||||
entry = %hop_cfgs[0].addr,
|
||||
exit = %hop_cfgs[hop_count - 1].addr,
|
||||
cell_padding = cfg.client.circuit.cell_padding,
|
||||
cell_size = cfg.client.circuit.cell_size,
|
||||
rotation_interval_secs = rotation_secs,
|
||||
"building v3.2 multi-hop circuit"
|
||||
);
|
||||
let circuit_conn = circuit::dial_circuit(&hop_cfgs, dial_cfg.udp)
|
||||
|
||||
// v3.3: if rotation is configured, wrap the circuit in a RotatingCircuit so the
|
||||
// background rotator can swap the inner CircuitConnection on a timer. The RotatingCircuit
|
||||
// itself dials the initial chain inside `::new`. When cell_padding is also on, the
|
||||
// padding wrapper goes *outside* the rotator so every rotated circuit transports cells of
|
||||
// the same constant size — keeping the on-wire signature stable across rebuilds.
|
||||
let inner_dyn: Arc<dyn PacketConnection> = if rotation_secs > 0 {
|
||||
let rot = circuit::RotatingCircuit::new(
|
||||
hop_cfgs,
|
||||
dial_cfg.udp,
|
||||
std::time::Duration::from_secs(rotation_secs),
|
||||
)
|
||||
.await
|
||||
.context("building multi-hop circuit (v3.2)")?;
|
||||
let peer_id = circuit_conn.peer_id().map(str::to_owned);
|
||||
tracing::info!(
|
||||
peer = ?peer_id,
|
||||
"v3.2 circuit established (inner handshake authenticated the EXIT server)"
|
||||
);
|
||||
// v3.2 cell padding: wrap the circuit in a constant-size cell stream so on-wire bytes do
|
||||
// not leak per-packet size. The exit's [server] cell_padding_for_circuit_clients flag
|
||||
// MUST match.
|
||||
.context("building rotating multi-hop circuit (v3.3)")?;
|
||||
let peer_id = rot.peer_id().await;
|
||||
tracing::info!(
|
||||
peer = ?peer_id,
|
||||
rotation_interval_secs = rotation_secs,
|
||||
"v3.3 rotating circuit established"
|
||||
);
|
||||
Arc::new(rot)
|
||||
} else {
|
||||
let circuit_conn = circuit::dial_circuit(&hop_cfgs, dial_cfg.udp)
|
||||
.await
|
||||
.context("building multi-hop circuit (v3.2)")?;
|
||||
let peer_id = circuit_conn.peer_id().map(str::to_owned);
|
||||
tracing::info!(
|
||||
peer = ?peer_id,
|
||||
"v3.2 circuit established (inner handshake authenticated the EXIT server)"
|
||||
);
|
||||
circuit_conn.into_dyn()
|
||||
};
|
||||
|
||||
// v3.2 cell padding: wrap the (rotating or static) circuit in a constant-size cell stream
|
||||
// so on-wire bytes do not leak per-packet size. The exit's
|
||||
// [server] cell_padding_for_circuit_clients flag MUST match.
|
||||
let conn: Arc<dyn PacketConnection> = if cfg.client.circuit.cell_padding {
|
||||
Arc::new(crate::cells::CellPaddingConn::new(
|
||||
circuit_conn.into_dyn(),
|
||||
inner_dyn,
|
||||
cfg.client.circuit.cell_size,
|
||||
))
|
||||
} else {
|
||||
circuit_conn.into_dyn()
|
||||
inner_dyn
|
||||
};
|
||||
(conn, TransportMode::Udp)
|
||||
} else {
|
||||
|
||||
@@ -386,6 +386,16 @@ pub struct CircuitSection {
|
||||
/// `[server.relay] cell_size`.
|
||||
#[serde(default = "default_cell_size")]
|
||||
pub cell_size: usize,
|
||||
/// v3.3: background rotation interval in seconds. When greater than zero, the client wraps
|
||||
/// the dialed circuit in a [`crate::circuit::RotatingCircuit`] that silently rebuilds the
|
||||
/// N-hop chain every `rotation_interval_secs` seconds — new outer handshakes, fresh AEAD
|
||||
/// keys, and (with v3.2 per-hop client certs) rotated CNs.
|
||||
///
|
||||
/// `0` (the default) keeps the v3.2 behaviour: the circuit is dialed once and reused for the
|
||||
/// lifetime of the session. Recommended values: 300–900 seconds (5–15 min). Very low values
|
||||
/// (< 60 s) hammer the entry-relay's accept path and risk wedging the circuit on flaky links.
|
||||
#[serde(default)]
|
||||
pub rotation_interval_secs: u64,
|
||||
}
|
||||
|
||||
/// One entry in `[[client.circuit.hops]]`. Accepts either a flat `"IP:port"` string (v3.1 back
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
//! v3.3 config-parsing smoke test for `[client.circuit] rotation_interval_secs`.
|
||||
//!
|
||||
//! Asserts that:
|
||||
//! 1. A `client.toml` with `rotation_interval_secs = N` parses and surfaces `N` on the
|
||||
//! [`ClientConfigFile`].
|
||||
//! 2. Omitting the key keeps the v3.2-compatible default of `0` (i.e. rotation off).
|
||||
//!
|
||||
//! Pure TOML parsing — no networking, no actors. This is the back-compat smoke test the v3.3
|
||||
//! direction memory calls for.
|
||||
|
||||
use aura_cli::config::ClientConfigFile;
|
||||
|
||||
const TOML_WITH_ROTATION: &str = r#"
|
||||
[client]
|
||||
name = "laptop"
|
||||
server_addr = "203.0.113.10:443"
|
||||
sni = "cdn.example.com"
|
||||
|
||||
[client.circuit]
|
||||
enabled = true
|
||||
hops = ["198.51.100.5:443", "203.0.113.10:443"]
|
||||
cell_padding = true
|
||||
cell_size = 1280
|
||||
rotation_interval_secs = 600
|
||||
|
||||
[pki]
|
||||
ca_cert = "~/.aura/ca.crt"
|
||||
cert = "~/.aura/client.crt"
|
||||
key = "~/.aura/client.key"
|
||||
|
||||
[tunnel]
|
||||
local_ip = "10.7.0.2"
|
||||
"#;
|
||||
|
||||
const TOML_NO_ROTATION: &str = r#"
|
||||
[client]
|
||||
name = "laptop"
|
||||
server_addr = "203.0.113.10:443"
|
||||
sni = "cdn.example.com"
|
||||
|
||||
[client.circuit]
|
||||
enabled = true
|
||||
hops = ["198.51.100.5:443", "203.0.113.10:443"]
|
||||
|
||||
[pki]
|
||||
ca_cert = "~/.aura/ca.crt"
|
||||
cert = "~/.aura/client.crt"
|
||||
key = "~/.aura/client.key"
|
||||
|
||||
[tunnel]
|
||||
local_ip = "10.7.0.2"
|
||||
"#;
|
||||
|
||||
#[test]
|
||||
fn rotation_interval_secs_parses_when_set() {
|
||||
let cfg = ClientConfigFile::parse(TOML_WITH_ROTATION).expect("parse client.toml with rotation");
|
||||
let circuit = cfg.circuit();
|
||||
assert!(circuit.enabled, "circuit must be enabled");
|
||||
assert_eq!(circuit.hops.len(), 2);
|
||||
assert!(circuit.cell_padding);
|
||||
assert_eq!(circuit.cell_size, 1280);
|
||||
assert_eq!(
|
||||
circuit.rotation_interval_secs, 600,
|
||||
"rotation_interval_secs surfaces the TOML value"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn rotation_interval_secs_defaults_to_zero_back_compat() {
|
||||
let cfg =
|
||||
ClientConfigFile::parse(TOML_NO_ROTATION).expect("parse client.toml without rotation");
|
||||
let circuit = cfg.circuit();
|
||||
assert!(circuit.enabled);
|
||||
assert_eq!(
|
||||
circuit.rotation_interval_secs, 0,
|
||||
"default is 0 = rotation off; preserves v3.2 single-dial behaviour"
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,309 @@
|
||||
//! v3.3 background **circuit rotation** integration test.
|
||||
//!
|
||||
//! Drives a 2-hop loopback circuit (client → relay → exit) wrapped in a
|
||||
//! [`circuit::RotatingCircuit`] configured to rebuild itself every 500 ms. Over the lifetime of
|
||||
//! the test the client sends a steady stream of data packets and the exit echoes every one back
|
||||
//! through the (silently rotating) circuit. Assertions:
|
||||
//!
|
||||
//! 1. **Every** packet round-trips successfully — the rotation is invisible to the data plane.
|
||||
//! 2. The [`RotatingCircuit::rotation_count`] reports at least one successful rotation by the
|
||||
//! time the test ends, proving the background rotator actually ran.
|
||||
//!
|
||||
//! ## Why two hops and not three
|
||||
//!
|
||||
//! The 3-hop test in `multihop.rs` exists for protocol coverage. The rotation logic is
|
||||
//! orthogonal to hop count (it just re-runs whatever `dial_circuit` does), so we use the cheaper
|
||||
//! 2-hop topology to keep the test fast. Each rotation = one fresh outer handshake to the
|
||||
//! entry + one ExtendBridge + one inner handshake to the exit, plus full teardown of the
|
||||
//! previous chain.
|
||||
//!
|
||||
//! ## Why fresh actors per rotation
|
||||
//!
|
||||
//! Each [`UdpServer::accept`] returns ONE connection per server instance. Rotating the circuit
|
||||
//! re-dials the entry-relay and the exit, so both servers need to accept a *new* connection on
|
||||
//! every rotation. The actors in this test spawn per-rotation tasks that accept-then-handle as
|
||||
//! many connections as the test exchanges; the relay and exit ports are reused.
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use aura_cli::circuit::{self, HopConfig, RotatingCircuit};
|
||||
use aura_cli::relay::{self, RendezvousOutcome};
|
||||
use aura_pki::AuraCa;
|
||||
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
|
||||
use aura_transport::{UdpOpts, UdpServer};
|
||||
|
||||
const EXIT_SAN: &str = "localhost-exit-rot";
|
||||
const RELAY_SAN: &str = "localhost-relay-rot";
|
||||
const CLIENT_ID: &str = "client-multihop-rot";
|
||||
|
||||
/// Reserve and immediately release a free UDP port on loopback (the window before re-bind in the
|
||||
/// same process is negligible on a quiet test).
|
||||
fn free_udp_port() -> u16 {
|
||||
let sock = std::net::UdpSocket::bind("127.0.0.1:0").expect("bind ephemeral udp");
|
||||
sock.local_addr().expect("local_addr").port()
|
||||
}
|
||||
|
||||
fn server_cfg(ca: &AuraCa, san: &str) -> ServerConfig {
|
||||
let issued = ca.issue_server_cert(san).expect("issue server cert");
|
||||
ServerConfig {
|
||||
ca_cert_pem: ca.ca_cert_pem(),
|
||||
server_cert_pem: issued.cert_pem,
|
||||
server_key_pem: issued.key_pem,
|
||||
}
|
||||
}
|
||||
|
||||
fn client_cfg(ca: &AuraCa, server_name: &str) -> ClientConfig {
|
||||
let issued = ca.issue_client_cert(CLIENT_ID).expect("issue client cert");
|
||||
ClientConfig {
|
||||
ca_cert_pem: ca.ca_cert_pem(),
|
||||
client_cert_pem: issued.cert_pem,
|
||||
client_key_pem: issued.key_pem,
|
||||
server_name: server_name.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn an exit-actor that accepts an *unbounded* number of connections on `server`. Each
|
||||
/// accepted connection echoes every received packet back to its sender until the connection
|
||||
/// closes, then the actor goes back to `server.accept()`. The actor exits naturally when the
|
||||
/// `UdpServer` is dropped (all incoming sockets close) — the integration driver triggers that
|
||||
/// by dropping the [`RotatingCircuit`] at the end of the test.
|
||||
async fn spawn_multi_exit(server: UdpServer) {
|
||||
loop {
|
||||
match server.accept().await {
|
||||
Ok(conn) => {
|
||||
let conn: Arc<dyn PacketConnection> = Arc::new(conn);
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match conn.recv_packet().await {
|
||||
Ok(pkt) => {
|
||||
if conn.send_packet(&pkt).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a relay-actor that accepts and bridges an *unbounded* number of client connections.
|
||||
/// Each accepted connection runs the standard [`relay::rendezvous`] dance and then
|
||||
/// [`relay::run_bridge`] until the client drops; the actor immediately loops back to accept the
|
||||
/// next one. Reused across every rotation in this test.
|
||||
async fn spawn_multi_relay(server: UdpServer, whitelist: Vec<SocketAddr>) {
|
||||
loop {
|
||||
match server.accept().await {
|
||||
Ok(conn) => {
|
||||
let conn: Arc<dyn PacketConnection> = Arc::new(conn);
|
||||
let wl = whitelist.clone();
|
||||
tokio::spawn(async move {
|
||||
match relay::rendezvous(&conn, &wl).await {
|
||||
RendezvousOutcome::Bridged { bridge } => {
|
||||
relay::run_bridge(conn, bridge).await;
|
||||
}
|
||||
RendezvousOutcome::Refused | RendezvousOutcome::Fallback { .. } => {
|
||||
// Either no ExtendBridge ever arrived, or the exit was not on the
|
||||
// whitelist. Drop the connection; the client's dial will fail loudly.
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
Err(_) => return,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// End-to-end test: a 2-hop circuit rebuilt every 500 ms while a steady stream of data packets
|
||||
/// passes through it. Asserts that every packet round-trips and that the rotation counter
|
||||
/// advances at least twice over the ~3-second runtime.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn rotating_circuit_swaps_inner_under_traffic() {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_max_level(tracing::Level::INFO)
|
||||
.with_test_writer()
|
||||
.try_init();
|
||||
|
||||
let ca = AuraCa::generate("Aura v3.3 rotation Test CA").expect("ca");
|
||||
let exit_proto = server_cfg(&ca, EXIT_SAN);
|
||||
let relay_proto = server_cfg(&ca, RELAY_SAN);
|
||||
|
||||
let exit_port = free_udp_port();
|
||||
let relay_port = free_udp_port();
|
||||
let exit_addr: SocketAddr = format!("127.0.0.1:{exit_port}").parse().unwrap();
|
||||
let relay_addr: SocketAddr = format!("127.0.0.1:{relay_port}").parse().unwrap();
|
||||
|
||||
let exit_server =
|
||||
UdpServer::bind(exit_addr, exit_proto, UdpOpts::default()).expect("bind exit");
|
||||
let relay_server =
|
||||
UdpServer::bind(relay_addr, relay_proto, UdpOpts::default()).expect("bind relay");
|
||||
let exit_actual = exit_server.local_addr().expect("exit addr");
|
||||
let relay_actual = relay_server.local_addr().expect("relay addr");
|
||||
|
||||
// The relay must allow re-bridging to the same exit on every rotation.
|
||||
let whitelist = vec![exit_actual];
|
||||
|
||||
let exit_handle = tokio::spawn(spawn_multi_exit(exit_server));
|
||||
let relay_handle = tokio::spawn(spawn_multi_relay(relay_server, whitelist));
|
||||
|
||||
// Let the actors enter their accept loops.
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
// Per-hop client configs (RELAY_SAN for the entry, EXIT_SAN for the exit). We use the same
|
||||
// global cert via `client_cfg`; this test focuses on rotation, not on identity-unlinkability.
|
||||
let hops = vec![
|
||||
HopConfig {
|
||||
addr: relay_actual,
|
||||
proto_cfg: client_cfg(&ca, RELAY_SAN),
|
||||
},
|
||||
HopConfig {
|
||||
addr: exit_actual,
|
||||
proto_cfg: client_cfg(&ca, EXIT_SAN),
|
||||
},
|
||||
];
|
||||
|
||||
// Construct the rotator. The first dial happens synchronously inside ::new, so by the time
|
||||
// we return from this `await` the circuit is already serving packets. The interval is set
|
||||
// long enough that the dial-time overhead of a single rebuild (~1 s on a loaded macOS box
|
||||
// with three UDP-Aura handshakes happening in series) does not stack and starve the data
|
||||
// pump between rotations.
|
||||
let interval = Duration::from_millis(1500);
|
||||
let rotator = tokio::time::timeout(
|
||||
Duration::from_secs(20),
|
||||
RotatingCircuit::new(hops, UdpOpts::default(), interval),
|
||||
)
|
||||
.await
|
||||
.expect("RotatingCircuit::new did not finish within 20s")
|
||||
.expect("RotatingCircuit::new succeeded");
|
||||
|
||||
let rotator: Arc<RotatingCircuit> = Arc::new(rotator);
|
||||
|
||||
// The currently-active circuit's peer_id is the exit's SAN — proves the inner handshake
|
||||
// authenticated the exit through the relay opaquely.
|
||||
assert_eq!(
|
||||
rotator.peer_id().await.as_deref(),
|
||||
Some(EXIT_SAN),
|
||||
"active circuit's peer_id is the exit's SAN at construction time"
|
||||
);
|
||||
|
||||
// Pump traffic for ~6 seconds, every 100 ms. With a 1.5 s rotation interval the rotator
|
||||
// fires at t≈1.5, 3.0, 4.5 s — at least 2 rotations land inside the pump window even with
|
||||
// significant rebuild overhead. Some sends/recvs may transiently fail if a rotation lands
|
||||
// mid-send and tears down the inner connection underneath the snapshot — that is the
|
||||
// documented behaviour ("in-flight calls error or block until timeout"). We tolerate a
|
||||
// small number of such losses and assert the *majority* of packets round-trip.
|
||||
let pump_duration = Duration::from_secs(6);
|
||||
let send_interval = Duration::from_millis(100);
|
||||
let start = std::time::Instant::now();
|
||||
let mut sent = 0usize;
|
||||
let mut received_ok = 0usize;
|
||||
while start.elapsed() < pump_duration {
|
||||
let pkt: Vec<u8> = format!("rot-{sent:04}").into_bytes();
|
||||
// Send + recv. If a rotation lands while either is in flight the call on the old
|
||||
// snapshot may error; that is acceptable — what we want to prove is that the rotator
|
||||
// itself runs and that the data plane keeps serving on the freshly swapped-in circuit.
|
||||
let send_res = rotator.send_packet(&pkt).await;
|
||||
if send_res.is_err() {
|
||||
sent += 1;
|
||||
tokio::time::sleep(send_interval).await;
|
||||
continue;
|
||||
}
|
||||
match tokio::time::timeout(Duration::from_secs(3), rotator.recv_packet()).await {
|
||||
Ok(Ok(echoed)) => {
|
||||
assert_eq!(echoed, pkt, "echoed payload matches sent payload");
|
||||
received_ok += 1;
|
||||
}
|
||||
Ok(Err(_)) | Err(_) => {
|
||||
// Rotation likely tore down the inner that this recv was waiting on. Acceptable.
|
||||
}
|
||||
}
|
||||
sent += 1;
|
||||
tokio::time::sleep(send_interval).await;
|
||||
}
|
||||
|
||||
let rotations = rotator.rotation_count();
|
||||
println!(
|
||||
"v3.3 rotating circuit: sent={sent} received_ok={received_ok} rotations={rotations} \
|
||||
in {:?}",
|
||||
start.elapsed()
|
||||
);
|
||||
|
||||
assert!(
|
||||
sent >= 30,
|
||||
"expected at least 30 packets attempted in 6 s, got {sent}"
|
||||
);
|
||||
// At least 2/3 of the sent packets must round-trip — the gaps come from rotation windows.
|
||||
assert!(
|
||||
received_ok * 3 >= sent * 2,
|
||||
"expected at least 2/3 of {sent} packets to echo back, got {received_ok}"
|
||||
);
|
||||
assert!(
|
||||
rotations >= 2,
|
||||
"expected at least 2 successful rotations in 6 s at 1500 ms interval, got {rotations}"
|
||||
);
|
||||
|
||||
// Drop the rotator first to abort the background task and tear down the active circuit. The
|
||||
// actors then exit naturally as their accept loops drop.
|
||||
drop(rotator);
|
||||
relay_handle.abort();
|
||||
exit_handle.abort();
|
||||
// Best-effort wait so the actor tasks unblock the runtime before the test runs to completion.
|
||||
let _ = tokio::time::timeout(Duration::from_millis(200), async {
|
||||
let _ = relay_handle.await;
|
||||
let _ = exit_handle.await;
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
/// `RotatingCircuit::new` propagates any error from the initial [`circuit::dial_circuit`] — if
|
||||
/// the entry relay is unreachable, construction fails synchronously without spawning the
|
||||
/// background task. This guarantees the caller does not get a "zombie" rotator hammering an
|
||||
/// unreachable address.
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn rotating_circuit_initial_dial_failure_is_synchronous() {
|
||||
let ca = AuraCa::generate("Aura v3.3 rotation init-fail Test CA").expect("ca");
|
||||
|
||||
// Two reachable-but-pointing-at-nothing addresses. The `UdpClient::connect` to either will
|
||||
// time out, the initial dial_circuit returns Err, and `RotatingCircuit::new` propagates it.
|
||||
let bogus1: SocketAddr = "127.0.0.1:1".parse().unwrap();
|
||||
let bogus2: SocketAddr = "127.0.0.1:2".parse().unwrap();
|
||||
|
||||
let hops = vec![
|
||||
HopConfig {
|
||||
addr: bogus1,
|
||||
proto_cfg: client_cfg(&ca, RELAY_SAN),
|
||||
},
|
||||
HopConfig {
|
||||
addr: bogus2,
|
||||
proto_cfg: client_cfg(&ca, EXIT_SAN),
|
||||
},
|
||||
];
|
||||
|
||||
// Use a short connect timeout via the UDP opts default; we still bound the test in case the
|
||||
// dial library hangs for longer than expected.
|
||||
let res = tokio::time::timeout(
|
||||
Duration::from_secs(30),
|
||||
RotatingCircuit::new(hops, UdpOpts::default(), Duration::from_secs(60)),
|
||||
)
|
||||
.await
|
||||
.expect("RotatingCircuit::new returned within 30 s");
|
||||
|
||||
let err = match res {
|
||||
Ok(_) => panic!("RotatingCircuit::new must fail when the entry hop is unreachable"),
|
||||
Err(e) => e,
|
||||
};
|
||||
let msg = format!("{err:#}");
|
||||
// The error chain includes "initial dial_circuit" from our context() wrapper.
|
||||
assert!(
|
||||
msg.contains("initial dial_circuit") || msg.contains("dial entry hop"),
|
||||
"expected initial-dial error, got: {msg}"
|
||||
);
|
||||
// Ensure circuit module is still callable directly (no global side-effects from the failed
|
||||
// construction — just a smoke check that the test runs cleanly).
|
||||
let _ = circuit::dial_circuit_shared_cfg;
|
||||
}
|
||||
Reference in New Issue
Block a user