feat(cli): v3.1 multi-hop runtime — circuit client + relay rendezvous

Completes v3.1 multi-hop / onion routing (2 hops: client → entry-relay →
exit-server). Combined with the scaffold commit (6c14c0d), the property
holds: entry-relay knows the client IP + client_id but cannot decrypt the
data; exit knows the destination but sees the relay's IP as source.

- aura-cli::circuit: dial_circuit(&[entry, exit], proto_cfg, udp_opts) →
  CircuitConnection. Connects to entry as a normal UdpClient, sends an
  ExtendBridge control envelope, awaits CircuitReady, then runs a SECOND
  Aura handshake to the exit through a local loopback UDP proxy — the
  forwarder ferries datagrams between that proxy socket and the outer
  relay PacketConnection. The inner handshake therefore authenticates the
  EXIT cert (verified by the integration test asserting
  circuit.peer_id() == "localhost-exit"); the relay never sees the inner
  session keys.
- aura-cli::relay: rendezvous(conn, whitelist) -> Bridged{bridge} |
  Fallback{first_pkt} | Refused. 2-second window after handshake to receive
  ExtendBridge. Whitelist enforced; CircuitFailed on miss. Empty whitelist
  logs a warning and runs open. Timeout / non-control → Fallback so the
  same server can be both relay (for circuit clients) and exit (for direct
  clients) simultaneously.
- aura-cli::client: when [client.circuit] enabled → dial_circuit; falls
  back to normal aura_transport::dial when disabled.
- aura-cli::server: relay rendezvous wired before pool/CRL/router path.
  run_bridge spawns two forwarder tasks (conn↔bridge UDP socket).
- 3 integration tests: end-to-end (with peer_id assertion), whitelist
  rejection, back-compat (relay disabled → Err). 3 unit tests in relay.rs.

Workspace: 253 tests passed (247 baseline + 6 new), clippy -D warnings clean,
fmt clean. No new workspace deps. All 28 tracked tasks (v1 + v2 + v3.1) now
complete.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xah30
2026-05-27 13:16:07 +03:00
parent 6c14c0d103
commit fe618b839d
9 changed files with 1090 additions and 13 deletions
+290
View File
@@ -0,0 +1,290 @@
//! v3.1 multi-hop / onion routing: the **client side** of the 2-hop circuit
//! `client → entry-relay → exit-server`.
//!
//! ## Wire dance
//!
//! 1. The client opens a normal UDP transport connection to the **entry relay** via
//! [`UdpClient::connect`]. The relay's cert is mutually authenticated by this **outer** Aura
//! handshake.
//! 2. Through the established outer connection, the client sends one
//! [`aura_proto::ControlKind::ExtendBridge`] envelope carrying the literal `IP:port` of the
//! downstream **exit server**.
//! 3. The relay either replies with [`aura_proto::ControlKind::CircuitReady`] (the bridge to the
//! exit is up; every subsequent byte travels opaquely) or
//! [`aura_proto::ControlKind::CircuitFailed`] (the relay refused — payload is a UTF-8 reason).
//! 4. Once `CircuitReady` arrives the client opens a **local proxy UDP socket** on loopback and
//! runs a second [`UdpClient::connect`] **at that loopback address** — this is the **inner**
//! handshake, addressed semantically to the exit-server. A background forwarder ferries every
//! datagram between the local proxy socket and the outer relay connection: the relay extracts
//! each datagram and ships it to the exit verbatim. The exit therefore runs an ordinary
//! [`aura_transport::UdpServer`] accepting one connection whose source address is the relay's
//! bridge socket.
//!
//! Result: traffic is wrapped under **two AEAD layers** — first the exit's session keys (inner
//! handshake) and again the relay's session keys (outer handshake). The exit knows the client's
//! certificate CN but not the client's real source IP; the relay knows the client's source IP but
//! not the destination IP nor a single plaintext byte.
//!
//! ## Why a local proxy UDP socket?
//!
//! The Aura UDP transport (`aura_transport::udp`) is built around a [`tokio::net::UdpSocket`]: its
//! reliable-handshake adapter writes/reads complete datagrams with a 1-byte type prefix
//! (`0x01` HS, `0x02` DATA). Re-using the transport without that socket would mean re-implementing
//! the whole reliability layer. The loopback proxy is the smallest hack that lets the inner
//! [`UdpClient`] talk over its expected datagram interface while every datagram is actually being
//! tunnelled through the outer relay connection.
use std::net::SocketAddr;
use std::sync::Arc;
use anyhow::{anyhow, bail, Context};
use async_trait::async_trait;
use aura_proto::{
decode_control_envelope, encode_control_envelope, encode_extend_bridge, ClientConfig,
ControlKind, PacketConnection,
};
use aura_transport::{UdpClient, UdpConnection, UdpOpts};
use tokio::net::UdpSocket;
use tokio::task::JoinHandle;
/// How long the client waits for the relay to reply with [`ControlKind::CircuitReady`] (or
/// [`ControlKind::CircuitFailed`]) after sending the [`ControlKind::ExtendBridge`] envelope.
const READY_TIMEOUT_SECS: u64 = 5;
/// An established 2-hop circuit: it is **literally** a [`UdpConnection`] in disguise. The inner
/// connection's outgoing datagrams go to a local proxy socket, which forwards them through the
/// outer relay connection to the exit. From the inner handshake / data exchange's point of view
/// nothing is special — it is talking to a normal Aura UDP server.
///
/// The two background tasks (proxy forwarders) and the outer connection are owned here, so dropping
/// the circuit tears everything down in order.
pub struct CircuitConnection {
/// The inner UDP connection (target of the second handshake addressed to the exit). All
/// `send_packet` / `recv_packet` go through this; the proxy forwarder splices the bytes onto
/// the outer relay connection.
inner: UdpConnection,
/// Outer relay connection — pinned alive for the lifetime of the circuit. The forwarder owns
/// clones, but holding it here means the outer is dropped at exactly the same time as `Self`.
_outer_conn_holder: Arc<dyn PacketConnection>,
/// Background task: local proxy socket ↔ outer relay connection. Aborted in [`Drop`].
forwarder: JoinHandle<()>,
/// Local proxy socket kept alive for the forwarder's lifetime (the forwarder also holds an
/// `Arc<UdpSocket>` clone, but this prevents close-on-last-clone races during shutdown).
_proxy_socket: Arc<UdpSocket>,
}
impl Drop for CircuitConnection {
fn drop(&mut self) {
self.forwarder.abort();
}
}
impl CircuitConnection {
/// The verified peer Common Name as learned during the **inner** handshake. This is the
/// **exit-server's** identity (NOT the relay's) — the whole point of multi-hop is that the
/// inner handshake authenticates the exit through the relay opaquely.
#[must_use]
pub fn peer_id(&self) -> Option<&str> {
self.inner.peer_id()
}
/// Promote into a trait object so the router / dialer layer can treat the circuit the same way
/// it treats a single-hop UDP / TCP / QUIC connection.
#[must_use]
pub fn into_dyn(self) -> Arc<dyn PacketConnection> {
Arc::new(self)
}
}
#[async_trait]
impl PacketConnection for CircuitConnection {
async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
// Delegate to the inner UdpConnection — the proxy forwarder picks up its outgoing
// datagrams from the local proxy socket and tunnels them through the outer relay.
self.inner.send_packet(packet).await
}
async fn recv_packet(&self) -> anyhow::Result<Vec<u8>> {
self.inner.recv_packet().await
}
}
/// Build a 2-hop circuit `client → hops[0] (entry relay) → hops[1] (exit server)` and return it
/// as a [`CircuitConnection`].
///
/// Both hops are reached via the [`UdpClient`] transport in v3.1. `proto_cfg.server_name` is used
/// by the **inner** handshake to verify the EXIT's certificate SAN. The relay's own cert is also
/// CA-verified by the outer handshake; pass [`dial_circuit_with_relay_name`] when the relay's SAN
/// differs from the exit's.
///
/// # Errors
/// * The outer UDP connection to the entry relay failed.
/// * The relay refused (`CircuitFailed`) or did not reply within [`READY_TIMEOUT_SECS`] seconds.
/// * The inner Aura handshake (through the relay) failed (bad exit cert chain, SAN mismatch, etc.).
pub async fn dial_circuit(
hops: &[SocketAddr],
proto_cfg: ClientConfig,
udp_opts: UdpOpts,
) -> anyhow::Result<CircuitConnection> {
dial_circuit_with_relay_name(hops, proto_cfg, udp_opts, None).await
}
/// Variant of [`dial_circuit`] letting the caller override the SAN expected on the relay's cert
/// (the outer handshake) independently of the exit's expected SAN (`proto_cfg.server_name`, used
/// by the inner handshake). See [`dial_circuit`] for the high-level wire dance.
pub async fn dial_circuit_with_relay_name(
hops: &[SocketAddr],
proto_cfg: ClientConfig,
udp_opts: UdpOpts,
relay_server_name: Option<&str>,
) -> anyhow::Result<CircuitConnection> {
if hops.len() != 2 {
bail!(
"v3.1 multi-hop requires exactly 2 hops (entry, exit), got {}",
hops.len()
);
}
let entry = hops[0];
// 1) Dial entry via the existing UDP transport. The outer mutual-auth handshake against the
// relay's certificate runs here; when `relay_server_name` is supplied the verifier
// validates the relay's SAN against that name instead of the exit's.
let mut outer_cfg = proto_cfg.clone();
if let Some(name) = relay_server_name {
outer_cfg.server_name = name.to_string();
}
let outer = UdpClient::connect(entry, outer_cfg, udp_opts)
.await
.with_context(|| format!("dial entry relay at {entry}"))?;
let outer: Arc<dyn PacketConnection> = outer.into_dyn();
// 2) Send the ExtendBridge control envelope describing the downstream exit address.
let exit = hops[1];
let payload = encode_extend_bridge(exit);
let envelope = encode_control_envelope(ControlKind::ExtendBridge, &payload);
outer
.send_packet(&envelope)
.await
.context("send ExtendBridge to relay")?;
// 3) Wait for CircuitReady (with a hard timeout). The relay may send unrelated control
// envelopes in front of ours (e.g. a CRL push from the v2 path) — those are ignored until
// the expected envelope arrives or the deadline elapses.
let ready_deadline =
tokio::time::Instant::now() + std::time::Duration::from_secs(READY_TIMEOUT_SECS);
loop {
let now = tokio::time::Instant::now();
if now >= ready_deadline {
bail!("timeout waiting for CircuitReady from relay at {entry}");
}
let remaining = ready_deadline - now;
let pkt = tokio::time::timeout(remaining, outer.recv_packet())
.await
.map_err(|_| anyhow!("timeout waiting for CircuitReady from relay at {entry}"))?
.context("recv from entry relay")?;
match decode_control_envelope(&pkt) {
Ok(Some((ControlKind::CircuitReady, _))) => break,
Ok(Some((ControlKind::CircuitFailed, reason))) => {
let r = String::from_utf8_lossy(&reason);
bail!("relay refused circuit: {r}");
}
Ok(Some((other, _))) => {
tracing::debug!(
kind = ?other,
"ignoring unexpected control envelope while waiting for CircuitReady"
);
continue;
}
Ok(None) => {
tracing::debug!("ignoring non-control packet from relay before CircuitReady");
continue;
}
Err(e) => {
tracing::debug!(error = %e, "malformed envelope from relay before CircuitReady");
continue;
}
}
}
// 4) Bring up the local proxy UDP socket. The inner UdpClient will `connect()` to its address;
// every datagram it sends goes through the forwarder below to the outer relay connection,
// and every datagram the relay forwards from the exit is replayed back to the inner socket.
let proxy_socket = UdpSocket::bind("127.0.0.1:0")
.await
.context("bind local circuit proxy socket")?;
let proxy_addr = proxy_socket
.local_addr()
.context("read local proxy address")?;
let proxy_socket = Arc::new(proxy_socket);
// 5) Spawn the forwarder BEFORE running the inner handshake — the handshake's first datagram
// must already be flowing while it is being written.
let outer_for_send = Arc::clone(&outer);
let outer_for_recv = Arc::clone(&outer);
let proxy_for_send = Arc::clone(&proxy_socket);
let proxy_for_recv = Arc::clone(&proxy_socket);
let forwarder = tokio::spawn(async move {
// Source address of the inner UdpClient, learned from its first datagram on the proxy
// socket. We need it to know where to deliver `outer.recv_packet` payloads back.
let inner_peer: Arc<tokio::sync::Mutex<Option<SocketAddr>>> =
Arc::new(tokio::sync::Mutex::new(None));
// Task A: proxy.recv_from → outer.send_packet
let inner_peer_a = Arc::clone(&inner_peer);
let to_outer = async move {
let mut buf = vec![0u8; 4096];
loop {
let (n, from) = match proxy_for_recv.recv_from(&mut buf).await {
Ok(v) => v,
Err(_) => break,
};
{
let mut latch = inner_peer_a.lock().await;
if latch.is_none() {
*latch = Some(from);
}
}
if outer_for_send.send_packet(&buf[..n]).await.is_err() {
break;
}
}
};
// Task B: outer.recv_packet → proxy.send_to(inner_peer_addr)
let inner_peer_b = Arc::clone(&inner_peer);
let from_outer = async move {
loop {
let pkt = match outer_for_recv.recv_packet().await {
Ok(p) => p,
Err(_) => break,
};
let dest = { *inner_peer_b.lock().await };
if let Some(dest) = dest {
if proxy_for_send.send_to(&pkt, dest).await.is_err() {
break;
}
}
// Else: the inner UdpClient has not sent its first datagram yet; drop. (The
// reliable adapter will retransmit on its RTO timer.) This race window is tiny —
// we always spawn the forwarder before `UdpClient::connect`.
}
};
tokio::select! {
_ = to_outer => {}
_ = from_outer => {}
}
});
// 6) Inner Aura handshake addressed to the EXIT, via the local proxy. The peer_id we capture
// is the exit's verified CN (the core invariant: the inner handshake authenticates the
// exit, not the relay).
let inner = UdpClient::connect(proxy_addr, proto_cfg, udp_opts)
.await
.context("inner handshake to exit through relay")?;
Ok(CircuitConnection {
inner,
_outer_conn_holder: outer,
forwarder,
_proxy_socket: proxy_socket,
})
}
+32 -9
View File
@@ -24,11 +24,12 @@ use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use aura_transport::dial;
use aura_transport::{dial, TransportMode};
use aura_tunnel::{AuraDns, AuraRouter, AuraTun, RouteAction};
use tokio::sync::RwLock;
use crate::admin::{self, AdminState, Stats};
use crate::circuit;
use crate::config::{expand_tilde, ClientConfigFile};
use crate::crl_push::AcceptPushedCrlConn;
use crate::masks::MaskRotator;
@@ -96,14 +97,36 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> {
let routes = Arc::new(RwLock::new(table));
let stats = Arc::new(Stats::new());
// Dial: try each transport in `[transport] order` (UDP→TCP→QUIC handover) until one connects.
// Each transport runs the inner Aura mutual-auth handshake; the winner is returned as a uniform
// `Arc<dyn PacketConnection>` along with which mode carried it. (The trait object does not surface
// the verified server CN; the server identity was already checked against `[client] sni` inside
// the handshake, so we record that as the peer for the admin/status mirror.)
let (conn, mode) = dial(proto_cfg.clone(), dial_cfg)
.await
.context("connecting to Aura server")?;
// Dial: when [client.circuit] is enabled, build a 2-hop circuit `client → entry-relay → exit`
// via [`circuit::dial_circuit`]. Otherwise fall back to the v2 single-hop dial across the
// configured [transport] order. In both cases the result is a uniform `Arc<dyn PacketConnection>`
// so the downstream router does not care which path was taken.
let (conn, mode) = if cfg.circuit.enabled {
let hops = cfg
.circuit_hops()
.context("parsing [client.circuit] hops")?;
tracing::info!(
entry = %hops[0],
exit = %hops[1],
"building v3.1 2-hop circuit"
);
let circuit_conn = circuit::dial_circuit(&hops, proto_cfg.clone(), dial_cfg.udp)
.await
.context("building multi-hop circuit (v3.1)")?;
let peer_id = circuit_conn.peer_id().map(str::to_owned);
tracing::info!(
peer = ?peer_id,
"v3.1 circuit established (inner handshake authenticated the EXIT server)"
);
(circuit_conn.into_dyn(), TransportMode::Udp)
} else {
// Each transport runs the inner Aura mutual-auth handshake; the winner is returned along
// with which mode carried it. (The trait object does not surface the verified server CN;
// the server identity was already checked against `[client] sni` inside the handshake.)
dial(proto_cfg.clone(), dial_cfg)
.await
.context("connecting to Aura server")?
};
let peer = Some(cfg.client.sni.clone());
stats.set_peer_id(peer.clone());
tracing::info!(peer = ?peer, %mode, "connected and authenticated to server");
+3 -3
View File
@@ -896,9 +896,9 @@ impl ClientConfigFile {
pub fn circuit_hops(&self) -> anyhow::Result<Vec<SocketAddr>> {
let mut out = Vec::with_capacity(self.circuit.hops.len());
for raw in &self.circuit.hops {
let addr: SocketAddr = raw
.parse()
.with_context(|| format!("invalid [client.circuit] hop '{raw}' (expected IP:port)"))?;
let addr: SocketAddr = raw.parse().with_context(|| {
format!("invalid [client.circuit] hop '{raw}' (expected IP:port)")
})?;
out.push(addr);
}
if self.circuit.enabled && out.len() != 2 {
+2
View File
@@ -14,6 +14,7 @@
pub mod admin;
pub mod bench;
pub mod circuit;
pub mod client;
pub mod config;
pub mod crl_push;
@@ -26,5 +27,6 @@ pub mod os_routes;
pub mod pki;
pub mod pool;
pub mod privdrop;
pub mod relay;
pub mod server;
pub mod server_router;
+339
View File
@@ -0,0 +1,339 @@
//! v3.1 multi-hop / onion routing: the **server (entry-relay) side**.
//!
//! Companion to [`crate::circuit`]. When `[server.relay] enabled = true`, the server's accept
//! loop performs a short **rendezvous** on each fresh client connection: it waits up to
//! [`EXTEND_RENDEZVOUS_SECS`] seconds for a first packet, and:
//!
//! * If the packet decodes as a [`ControlKind::ExtendBridge`] envelope, the server resolves the
//! downstream `exit_addr`, checks it against the configured whitelist, opens a raw UDP socket
//! to the exit, sends [`ControlKind::CircuitReady`] back to the client, and starts two
//! forwarder tasks — one in each direction — splicing the client's [`PacketConnection`] to the
//! bridge socket. The connection is NOT registered with the [`crate::server_router::ServerRouter`];
//! bridged peers do not consume an IP from the pool.
//! * Otherwise the packet is replayed back into a fallback channel and the accept loop continues
//! handling the connection as a normal VPN client. This dual-role mode lets one server be a
//! relay for some peers and an exit for others, depending on what each client chose to send first.
//!
//! ## Whitelist semantics
//!
//! `[server.relay] allow_extend_to` is parsed by
//! [`ServerConfigFile::relay_whitelist`](crate::config::ServerConfigFile::relay_whitelist) into a
//! `Vec<SocketAddr>`. An empty whitelist is treated as **open relay** — every `exit_addr` is
//! accepted — and we emit a `warn` log so the operator notices the dangerous configuration. A
//! non-empty whitelist that does not contain the requested `exit_addr` causes us to reply with
//! [`ControlKind::CircuitFailed`] (payload: `"not in allow_extend_to"`) and drop the connection.
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use aura_proto::{
decode_control_envelope, decode_extend_bridge, encode_control_envelope, ControlKind,
PacketConnection,
};
use tokio::net::UdpSocket;
/// How long the relay waits for the client's first packet on a fresh connection before falling
/// back to treating the connection as a normal VPN client. Two seconds is comfortably longer than
/// a loopback round-trip (the client sends `ExtendBridge` immediately after the outer handshake
/// returns) but short enough that fallback clients do not perceive a stall.
pub const EXTEND_RENDEZVOUS_SECS: u64 = 2;
/// Outcome of the [`rendezvous`] phase on a fresh connection.
///
/// * [`RendezvousOutcome::Bridged`] — the client sent [`ControlKind::ExtendBridge`]; the bridge
/// socket has been opened and the relay can now spawn the forwarders. The caller MUST NOT
/// register this connection with the IP pool / router.
/// * [`RendezvousOutcome::Fallback`] — no `ExtendBridge` arrived in time, or the first packet
/// was not a control envelope. The caller should resume the v2 path and treat the connection
/// as a normal VPN client.
/// * [`RendezvousOutcome::Refused`] — the client asked for an exit that is not on the whitelist;
/// the relay has already replied with [`ControlKind::CircuitFailed`] and the caller should drop
/// the connection.
pub enum RendezvousOutcome {
/// The connection is now a bridge to `bridge` (a UDP socket connected to the exit). The
/// caller should spawn [`run_bridge`] to ferry packets.
Bridged { bridge: Arc<UdpSocket> },
/// No bridge was requested; the connection is a normal VPN client. `first_pkt` is the first
/// packet the caller observed during the rendezvous window (if any) so it can be replayed
/// into the normal processing pipeline; in v3.1 we drop it (the v2 path expects to call
/// `recv_packet` itself from a clean state) — see the callsite for details.
Fallback { first_pkt: Option<Vec<u8>> },
/// The client asked for an exit not on the whitelist. The caller should drop the connection.
Refused,
}
/// Perform the rendezvous on a freshly-accepted relay connection.
///
/// Reads (with a [`EXTEND_RENDEZVOUS_SECS`] timeout) the next packet from `conn`. When it decodes
/// as [`ControlKind::ExtendBridge`] and the requested exit is whitelisted, this function:
///
/// 1. Binds a UDP socket on `0.0.0.0:0` (`[::]:0` for an IPv6 exit) and `connect()`s it to the
/// exit address.
/// 2. Sends [`ControlKind::CircuitReady`] back to the client.
/// 3. Returns [`RendezvousOutcome::Bridged`] with the bridge socket.
///
/// On a whitelist miss it sends [`ControlKind::CircuitFailed`] and returns
/// [`RendezvousOutcome::Refused`]. On any timeout / non-control / decode failure it returns
/// [`RendezvousOutcome::Fallback`] so the caller can continue with the v2 VPN-client path.
pub async fn rendezvous(
conn: &Arc<dyn PacketConnection>,
whitelist: &[SocketAddr],
) -> RendezvousOutcome {
let pkt = match tokio::time::timeout(
Duration::from_secs(EXTEND_RENDEZVOUS_SECS),
conn.recv_packet(),
)
.await
{
Ok(Ok(p)) => p,
Ok(Err(e)) => {
tracing::debug!(error = %e, "relay rendezvous: recv failed; fallback to VPN client path");
return RendezvousOutcome::Fallback { first_pkt: None };
}
Err(_) => {
tracing::debug!(
"relay rendezvous: no ExtendBridge within {EXTEND_RENDEZVOUS_SECS}s; \
fallback to VPN client path"
);
return RendezvousOutcome::Fallback { first_pkt: None };
}
};
let envelope = match decode_control_envelope(&pkt) {
Ok(Some((kind, payload))) => Some((kind, payload)),
Ok(None) => None,
Err(e) => {
tracing::debug!(error = %e, "relay rendezvous: malformed envelope; fallback");
return RendezvousOutcome::Fallback {
first_pkt: Some(pkt),
};
}
};
let Some((kind, payload)) = envelope else {
tracing::debug!(
"relay rendezvous: first packet is not a control envelope; fallback to VPN client path"
);
return RendezvousOutcome::Fallback {
first_pkt: Some(pkt),
};
};
if kind != ControlKind::ExtendBridge {
tracing::debug!(
kind = ?kind,
"relay rendezvous: first envelope is not ExtendBridge; fallback to VPN client path"
);
return RendezvousOutcome::Fallback {
first_pkt: Some(pkt),
};
}
let exit_addr: SocketAddr = match decode_extend_bridge(&payload) {
Ok(a) => a,
Err(e) => {
tracing::warn!(error = %e, "relay rendezvous: malformed ExtendBridge payload; refusing");
let reply = encode_control_envelope(
ControlKind::CircuitFailed,
b"malformed ExtendBridge payload",
);
let _ = conn.send_packet(&reply).await;
return RendezvousOutcome::Refused;
}
};
// Whitelist enforcement. Empty whitelist == open relay (operator was warned via the log line
// emitted when the section was loaded; we also re-log here so each accepted bridge leaves a
// breadcrumb).
if whitelist.is_empty() {
tracing::warn!(
exit = %exit_addr,
"relay running as OPEN relay (allow_extend_to is empty); accepting bridge"
);
} else if !whitelist.contains(&exit_addr) {
tracing::warn!(
exit = %exit_addr,
"relay rejecting bridge: exit not in allow_extend_to"
);
let reply = encode_control_envelope(ControlKind::CircuitFailed, b"not in allow_extend_to");
let _ = conn.send_packet(&reply).await;
return RendezvousOutcome::Refused;
}
// Open the bridge socket. We bind matching the exit's address family so a relay running on a
// dual-stack host does not accidentally try to use an IPv4 socket to reach an IPv6 exit.
let bind_addr: SocketAddr = if exit_addr.is_ipv4() {
"0.0.0.0:0".parse().expect("valid v4 bind addr")
} else {
"[::]:0".parse().expect("valid v6 bind addr")
};
let bridge = match UdpSocket::bind(bind_addr).await {
Ok(s) => s,
Err(e) => {
tracing::warn!(error = %e, exit = %exit_addr, "relay could not bind bridge socket");
let msg = format!("bridge bind failed: {e}");
let reply = encode_control_envelope(ControlKind::CircuitFailed, msg.as_bytes());
let _ = conn.send_packet(&reply).await;
return RendezvousOutcome::Refused;
}
};
if let Err(e) = bridge.connect(exit_addr).await {
tracing::warn!(error = %e, exit = %exit_addr, "relay could not connect bridge socket to exit");
let msg = format!("bridge connect failed: {e}");
let reply = encode_control_envelope(ControlKind::CircuitFailed, msg.as_bytes());
let _ = conn.send_packet(&reply).await;
return RendezvousOutcome::Refused;
}
let ready = encode_control_envelope(ControlKind::CircuitReady, &[]);
if let Err(e) = conn.send_packet(&ready).await {
tracing::warn!(error = %e, "relay failed to send CircuitReady; dropping");
return RendezvousOutcome::Refused;
}
tracing::info!(
exit = %exit_addr,
"relay rendezvous succeeded; bridging client to exit"
);
RendezvousOutcome::Bridged {
bridge: Arc::new(bridge),
}
}
/// Splice a client-side [`PacketConnection`] to a `connect()`ed bridge UDP socket, ferrying bytes
/// in both directions until either side closes. Drives **two** tasks (each direction) and joins
/// them so the function returns when both have ended.
///
/// The relay never decrypts the inner Aura handshake / data: bytes from the client are sent as
/// raw UDP datagrams to the exit, and bytes from the exit are wrapped back in a
/// [`PacketConnection::send_packet`] call on the client connection. This is what makes the
/// `client ↔ exit` handshake travel through the relay opaquely.
pub async fn run_bridge(client_conn: Arc<dyn PacketConnection>, bridge: Arc<UdpSocket>) {
let conn_a = Arc::clone(&client_conn);
let br_a = Arc::clone(&bridge);
let to_exit = tokio::spawn(async move {
while let Ok(buf) = conn_a.recv_packet().await {
if br_a.send(&buf).await.is_err() {
break;
}
}
});
let conn_b = Arc::clone(&client_conn);
let br_b = Arc::clone(&bridge);
let to_client = tokio::spawn(async move {
let mut buf = vec![0u8; 2048];
while let Ok(n) = br_b.recv(&mut buf).await {
if conn_b.send_packet(&buf[..n]).await.is_err() {
break;
}
}
});
let _ = tokio::join!(to_exit, to_client);
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::VecDeque;
use async_trait::async_trait;
use aura_proto::encode_extend_bridge;
use tokio::sync::Mutex as TokioMutex;
/// In-memory mock that lets us drive [`rendezvous`] without a real Aura connection.
struct MockConn {
to_recv: TokioMutex<VecDeque<anyhow::Result<Vec<u8>>>>,
sent: TokioMutex<Vec<Vec<u8>>>,
}
impl MockConn {
fn new(items: impl IntoIterator<Item = anyhow::Result<Vec<u8>>>) -> Arc<Self> {
Arc::new(Self {
to_recv: TokioMutex::new(items.into_iter().collect()),
sent: TokioMutex::new(Vec::new()),
})
}
}
#[async_trait]
impl PacketConnection for MockConn {
async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
self.sent.lock().await.push(packet.to_vec());
Ok(())
}
async fn recv_packet(&self) -> anyhow::Result<Vec<u8>> {
match self.to_recv.lock().await.pop_front() {
Some(item) => item,
None => {
// Block forever — the caller's timeout will trip first. Use
// `std::future::pending` so we do not pull in a `futures` dep.
std::future::pending::<()>().await;
unreachable!("pending future never resolves")
}
}
}
}
/// An ExtendBridge to an exit that is **not** on the whitelist is refused: the relay sends
/// CircuitFailed back and the rendezvous outcome is `Refused`.
#[tokio::test]
async fn whitelist_miss_refuses_with_circuit_failed() {
let target: SocketAddr = "203.0.113.5:443".parse().unwrap();
let allowed: SocketAddr = "203.0.113.99:443".parse().unwrap();
let payload = encode_extend_bridge(target);
let envelope = encode_control_envelope(ControlKind::ExtendBridge, &payload);
// Keep a typed handle to the mock so we can introspect what was sent without unsafe.
let mock = MockConn::new([Ok(envelope)]);
let conn: Arc<dyn PacketConnection> = mock.clone();
let outcome = rendezvous(&conn, &[allowed]).await;
assert!(matches!(outcome, RendezvousOutcome::Refused));
// Verify the relay actually answered with a CircuitFailed envelope.
let sent = mock.sent.lock().await.clone();
assert_eq!(sent.len(), 1, "exactly one reply was sent");
let (kind, reason) = decode_control_envelope(&sent[0]).unwrap().unwrap();
assert_eq!(kind, ControlKind::CircuitFailed);
assert_eq!(
std::str::from_utf8(&reason).unwrap(),
"not in allow_extend_to"
);
}
/// Empty whitelist == open relay. A target that is anywhere succeeds (we open the bridge
/// against loopback so the bind / connect actually succeed in the test).
#[tokio::test]
async fn empty_whitelist_acts_as_open_relay() {
// Reserve a free UDP port for the dummy exit so connect() succeeds on the bridge side.
let exit_sock = std::net::UdpSocket::bind("127.0.0.1:0").unwrap();
let exit_addr = exit_sock.local_addr().unwrap();
let payload = encode_extend_bridge(exit_addr);
let envelope = encode_control_envelope(ControlKind::ExtendBridge, &payload);
let conn: Arc<dyn PacketConnection> = MockConn::new([Ok(envelope)]);
let outcome = rendezvous(&conn, &[]).await;
assert!(matches!(outcome, RendezvousOutcome::Bridged { .. }));
}
/// When no packet arrives within the rendezvous window, fall back to the normal VPN-client
/// path. The relay does not send any reply.
#[tokio::test]
async fn timeout_falls_back_to_vpn_client_path() {
// Pass an empty mock so recv_packet blocks forever and the rendezvous timeout trips.
let conn: Arc<dyn PacketConnection> = MockConn::new([]);
// Tighten Tokio's clock: pause + advance is not appropriate here because rendezvous uses
// real timeouts (Duration::from_secs(2)); simply waiting in CI is fine because the test
// path is small. To keep CI fast, bump the timeout up: the test sets up a recv that
// blocks forever, so we want the rendezvous's own timeout to fire — that is the assertion.
//
// We use a `Box::pin(...)` + select to bound the test itself in case the rendezvous never
// returns (a regression).
let result = tokio::time::timeout(
std::time::Duration::from_secs(EXTEND_RENDEZVOUS_SECS + 2),
rendezvous(&conn, &[]),
)
.await
.expect("rendezvous returned within deadline");
assert!(matches!(
result,
RendezvousOutcome::Fallback { first_pkt: None }
));
}
}
+74 -1
View File
@@ -31,7 +31,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Context;
use aura_transport::MultiServer;
use aura_transport::{MultiServer, TransportMode};
use aura_tunnel::{AuraTun, RouteAction, RouteTable};
use ipnetwork::IpNetwork;
use tokio::sync::RwLock;
@@ -43,6 +43,7 @@ use crate::masks::MaskRotator;
use crate::nat::NatGuard;
use crate::pool::IpPool;
use crate::privdrop;
use crate::relay::{self, RendezvousOutcome};
use crate::server_router::ServerRouter;
/// Entry point for `aura server --config <PATH>` (and optional `--admin-socket`).
@@ -243,10 +244,45 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> {
}
});
// v3.1: when [server.relay] is enabled, parse the whitelist once and log a warning if it is
// empty (open relay). The whitelist is a `Vec<SocketAddr>`; an empty list means "all
// addresses allowed" (dangerous; see the section's docs).
let relay_enabled = cfg.server.relay.enabled;
let relay_whitelist: Vec<std::net::SocketAddr> = if relay_enabled {
let wl = cfg.relay_whitelist();
if wl.is_empty() {
tracing::warn!(
"[server.relay] is enabled with an EMPTY allow_extend_to — running as OPEN relay; \
every ExtendBridge request will be accepted. Set allow_extend_to to a curated list."
);
} else {
tracing::info!(
count = wl.len(),
"[server.relay] enabled with {} whitelisted exit address(es)",
wl.len()
);
}
wl
} else {
Vec::new()
};
// Accept loop. Each accepted connection (from any transport) is assigned an IP from the pool
// and registered with the [`ServerRouter`]; a per-conn task forwards inbound packets into the
// shared TUN. `MultiServer::accept` yields `None` only when every transport's accept loop has
// stopped.
//
// v3.1: when [server.relay] is enabled, every accepted UDP connection first undergoes a short
// **rendezvous** ([`relay::rendezvous`]) to see whether the client wants to be bridged through
// to a downstream exit. The rendezvous:
// * Reads with a 2-second timeout. If an `ExtendBridge` envelope arrives and its `exit_addr`
// is on the whitelist, the relay opens a bridge socket, replies with `CircuitReady`, and
// the connection is spliced byte-for-byte to the exit — NOT registered with the IP pool.
// * If nothing arrives within 2s or the first packet is not an `ExtendBridge` envelope, the
// connection falls back to the normal VPN-client path (IP pool + ServerRouter), exactly as
// in v2. This dual-role mode lets one server be a relay for some peers and an exit for
// others on the same listening port. Non-UDP transports (TCP, QUIC) skip rendezvous in
// v3.1; only UDP is supported as a hop transport.
loop {
let next = {
let mut srv = server.lock().await;
@@ -257,6 +293,43 @@ pub async fn run(config_path: &Path, admin_socket: &str) -> anyhow::Result<()> {
let mode = accepted.mode;
let conn = accepted.conn;
// v3.1 relay rendezvous (only on UDP-mode connections; v3.1 does not bridge TCP / QUIC).
if relay_enabled && mode == TransportMode::Udp {
match relay::rendezvous(&conn, &relay_whitelist).await {
RendezvousOutcome::Bridged { bridge } => {
// Spawn the two forwarder tasks and skip everything else (no IP pool entry,
// no router registration, no CRL push — bridged peers are opaque).
tracing::info!(
peer = ?peer_id, %mode,
"v3.1 relay: bridging connection to exit"
);
let client_conn = Arc::clone(&conn);
tokio::spawn(async move {
relay::run_bridge(client_conn, bridge).await;
});
continue;
}
RendezvousOutcome::Refused => {
tracing::warn!(
peer = ?peer_id, %mode,
"v3.1 relay: refusing connection (CircuitFailed sent); dropping"
);
drop(conn);
continue;
}
RendezvousOutcome::Fallback { .. } => {
// Fall through to the normal VPN-client handling below. (The first packet, if
// any, was either non-existent or non-control — for v3.1 we drop it; control
// envelopes that are not ExtendBridge are not expected on the first packet
// from a v2 client either.)
tracing::debug!(
peer = ?peer_id, %mode,
"v3.1 relay: no ExtendBridge received; handling as normal VPN client"
);
}
}
}
// Pick the client id used for static-pool lookup. The certificate CN is the only
// identity we can trust here; if absent (defensive — every authenticated connection has
// one in practice) fall back to a unique-per-instance marker so dynamic allocation still
+310
View File
@@ -0,0 +1,310 @@
//! v3.1 multi-hop / onion-routing integration test.
//!
//! Drives three actors on loopback in one process:
//!
//! * **Exit** — a vanilla [`UdpServer`] bound on a free UDP port. Its cert SAN is
//! `"localhost-exit"`. The server's accept task echoes the first three received packets back to
//! the sender, then drops.
//!
//! * **Relay** — another [`UdpServer`] on a free port, cert SAN `"localhost-relay"`. Its accept
//! task:
//! 1. accepts one connection (running its own outer Aura mutual-auth handshake with the
//! client),
//! 2. uses [`crate::relay::rendezvous`] to read the client's `ExtendBridge` envelope and open
//! a `connect()`ed UDP socket to the exit,
//! 3. spawns [`crate::relay::run_bridge`] to ferry bytes between the client and the bridge.
//!
//! * **Client** — calls [`circuit::dial_circuit_with_relay_name`] with
//! `relay_server_name = Some("localhost-relay")` and `proto_cfg.server_name = "localhost-exit"`.
//! The returned [`circuit::CircuitConnection`] should have `peer_id() == Some("localhost-exit")`
//! — the core multi-hop invariant: the **inner** handshake authenticated the exit's cert
//! through the relay opaquely, even though the outer hop authenticated the relay's cert.
//!
//! The test then exchanges three packets of varying sizes through the circuit and asserts that
//! every echoed reply matches.
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use aura_cli::circuit;
use aura_cli::relay::{self, RendezvousOutcome};
use aura_pki::AuraCa;
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
use aura_transport::{UdpOpts, UdpServer};
const EXIT_SAN: &str = "localhost-exit";
const RELAY_SAN: &str = "localhost-relay";
const CLIENT_ID: &str = "client-multihop";
/// Reserve and immediately release a free UDP port on loopback (the window before re-bind in the
/// same process is negligible on a quiet test).
fn free_udp_port() -> u16 {
let sock = std::net::UdpSocket::bind("127.0.0.1:0").expect("bind ephemeral udp");
sock.local_addr().expect("local_addr").port()
}
/// Build a [`ServerConfig`] from one shared CA, with the given SAN.
fn server_cfg(ca: &AuraCa, san: &str) -> ServerConfig {
let issued = ca.issue_server_cert(san).expect("issue server cert");
ServerConfig {
ca_cert_pem: ca.ca_cert_pem(),
server_cert_pem: issued.cert_pem,
server_key_pem: issued.key_pem,
}
}
/// Build a [`ClientConfig`] from one shared CA. `server_name` is used by the **inner** handshake
/// (the exit). The outer handshake's expected SAN is overridden separately at
/// [`circuit::dial_circuit_with_relay_name`] callsite.
fn client_cfg(ca: &AuraCa, server_name: &str) -> ClientConfig {
let issued = ca.issue_client_cert(CLIENT_ID).expect("issue client cert");
ClientConfig {
ca_cert_pem: ca.ca_cert_pem(),
client_cert_pem: issued.cert_pem,
client_key_pem: issued.key_pem,
server_name: server_name.to_string(),
}
}
/// Spawn the exit server: accept one connection and echo the first three packets back.
async fn spawn_exit(server: UdpServer) {
let conn = server.accept().await.expect("exit accept");
// The dropped server keeps the master loop alive via the connection's anchor.
drop(server);
let conn: Arc<dyn PacketConnection> = Arc::new(conn);
for _ in 0..3 {
match conn.recv_packet().await {
Ok(pkt) => {
if conn.send_packet(&pkt).await.is_err() {
return;
}
}
Err(_) => return,
}
}
}
/// Spawn the relay server: accept one connection, run the rendezvous, and bridge to the exit.
async fn spawn_relay(server: UdpServer, whitelist: Vec<SocketAddr>) {
let conn = server.accept().await.expect("relay accept");
drop(server);
let conn: Arc<dyn PacketConnection> = Arc::new(conn);
match relay::rendezvous(&conn, &whitelist).await {
RendezvousOutcome::Bridged { bridge } => {
relay::run_bridge(conn, bridge).await;
}
RendezvousOutcome::Refused => {
// Test path that exercises whitelist refusal — the relay sent CircuitFailed
// already; just exit.
}
RendezvousOutcome::Fallback { .. } => {
// The client did not send ExtendBridge — should not happen in the happy path.
panic!("relay rendezvous fell back unexpectedly");
}
}
}
#[tokio::test(flavor = "multi_thread")]
async fn multihop_v3_1_end_to_end() {
// One shared CA. Each role gets its own server cert with its own SAN.
let ca = AuraCa::generate("Aura Multi-Hop Test CA").expect("ca");
let exit_proto = server_cfg(&ca, EXIT_SAN);
let relay_proto = server_cfg(&ca, RELAY_SAN);
let client_proto = client_cfg(&ca, EXIT_SAN);
let exit_port = free_udp_port();
let relay_port = free_udp_port();
let exit_addr: SocketAddr = format!("127.0.0.1:{exit_port}").parse().unwrap();
let relay_addr: SocketAddr = format!("127.0.0.1:{relay_port}").parse().unwrap();
// Bind both servers BEFORE spawning the client so they are ready to accept.
let exit_server =
UdpServer::bind(exit_addr, exit_proto, UdpOpts::default()).expect("bind exit");
let relay_server =
UdpServer::bind(relay_addr, relay_proto, UdpOpts::default()).expect("bind relay");
let exit_actual = exit_server.local_addr().expect("exit addr");
let relay_actual = relay_server.local_addr().expect("relay addr");
// Whitelist contains exactly the exit address.
let whitelist = vec![exit_actual];
let exit_task = tokio::spawn(spawn_exit(exit_server));
let relay_task = tokio::spawn(spawn_relay(relay_server, whitelist));
// Give the servers a beat to enter their accept loops. Not strictly required (accept is
// resumable) but makes the trace easier to follow on failure.
tokio::time::sleep(Duration::from_millis(20)).await;
// Client: dial circuit. proto_cfg.server_name = "localhost-exit" so the inner handshake's
// verifier checks the exit's SAN; the outer handshake checks the relay's SAN via the explicit
// override.
let circuit_conn = tokio::time::timeout(
Duration::from_secs(30),
circuit::dial_circuit_with_relay_name(
&[relay_actual, exit_actual],
client_proto,
UdpOpts::default(),
Some(RELAY_SAN),
),
)
.await
.expect("dial_circuit did not finish within 30s")
.expect("dial_circuit succeeded");
// The core invariant: the INNER handshake authenticated the EXIT (not the relay).
assert_eq!(
circuit_conn.peer_id(),
Some(EXIT_SAN),
"circuit.peer_id() must be the exit's SAN — the inner handshake verified the exit's cert"
);
// Echo three packets of varying sizes through the circuit.
let payloads: Vec<Vec<u8>> = vec![
b"hello multi-hop".to_vec(),
vec![0xCDu8; 800],
(0..=255u8).collect(),
];
for pkt in &payloads {
circuit_conn.send_packet(pkt).await.expect("circuit send");
let echoed = tokio::time::timeout(Duration::from_secs(5), circuit_conn.recv_packet())
.await
.expect("recv timeout")
.expect("recv from exit through circuit");
assert_eq!(&echoed, pkt, "echoed payload must match");
}
// Clean shutdown — drop the client first, then wait for the actors to finish.
drop(circuit_conn);
let _ = tokio::time::timeout(Duration::from_secs(5), exit_task).await;
let _ = tokio::time::timeout(Duration::from_secs(5), relay_task).await;
}
/// A whitelist that does NOT contain the exit's address must cause `dial_circuit` to fail with an
/// error mentioning "allow_extend_to" (the reason string sent in `CircuitFailed`).
#[tokio::test(flavor = "multi_thread")]
async fn multihop_whitelist_rejects_disallowed_exit() {
let ca = AuraCa::generate("Aura Multi-Hop Test CA").expect("ca");
let exit_proto = server_cfg(&ca, EXIT_SAN);
let relay_proto = server_cfg(&ca, RELAY_SAN);
let client_proto = client_cfg(&ca, EXIT_SAN);
let exit_port = free_udp_port();
let relay_port = free_udp_port();
let exit_addr: SocketAddr = format!("127.0.0.1:{exit_port}").parse().unwrap();
let relay_addr: SocketAddr = format!("127.0.0.1:{relay_port}").parse().unwrap();
let exit_server =
UdpServer::bind(exit_addr, exit_proto, UdpOpts::default()).expect("bind exit");
let relay_server =
UdpServer::bind(relay_addr, relay_proto, UdpOpts::default()).expect("bind relay");
let exit_actual = exit_server.local_addr().expect("exit addr");
let relay_actual = relay_server.local_addr().expect("relay addr");
// Whitelist contains a different (fake) exit; the real exit is NOT allowed.
let fake: SocketAddr = "10.255.255.1:9".parse().unwrap();
let whitelist = vec![fake];
// Exit task: just sit there; we expect the relay never bridges to it.
let _exit_task = tokio::spawn(async move {
// Accept may never resolve; exit when test ends.
let _ = exit_server.accept().await;
});
let relay_task = tokio::spawn(spawn_relay(relay_server, whitelist));
tokio::time::sleep(Duration::from_millis(20)).await;
// dial_circuit must error with a message mentioning "allow_extend_to".
let res = tokio::time::timeout(
Duration::from_secs(15),
circuit::dial_circuit_with_relay_name(
&[relay_actual, exit_actual],
client_proto,
UdpOpts::default(),
Some(RELAY_SAN),
),
)
.await
.expect("dial_circuit_with_relay_name returned within 15s");
let err = match res {
Ok(_) => panic!("dial_circuit must fail when exit is not on the whitelist"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("allow_extend_to") || msg.contains("not in"),
"expected 'allow_extend_to' / 'not in' in error, got: {msg}"
);
let _ = tokio::time::timeout(Duration::from_secs(2), relay_task).await;
}
/// When the v3.1 relay path is **disabled** at the server, the server's accept-side never reads
/// the client's ExtendBridge envelope as a control message — instead the server would treat the
/// connection as a normal VPN client. From the client's `dial_circuit` perspective the relay
/// never sends `CircuitReady`, so the client times out (`READY_TIMEOUT_SECS`-bounded).
///
/// This test exercises that exact fallback: we run a `UdpServer` with NO rendezvous task,
/// accept the connection, and just keep it open. The client's `dial_circuit` must return an Err
/// whose message mentions a timeout / CircuitReady.
#[tokio::test(flavor = "multi_thread")]
async fn multihop_back_compat_relay_disabled() {
let ca = AuraCa::generate("Aura Multi-Hop Test CA").expect("ca");
let exit_proto = server_cfg(&ca, EXIT_SAN);
let relay_proto = server_cfg(&ca, RELAY_SAN);
let client_proto = client_cfg(&ca, EXIT_SAN);
let exit_port = free_udp_port();
let relay_port = free_udp_port();
let exit_addr: SocketAddr = format!("127.0.0.1:{exit_port}").parse().unwrap();
let relay_addr: SocketAddr = format!("127.0.0.1:{relay_port}").parse().unwrap();
let exit_server =
UdpServer::bind(exit_addr, exit_proto, UdpOpts::default()).expect("bind exit");
let relay_server =
UdpServer::bind(relay_addr, relay_proto, UdpOpts::default()).expect("bind relay");
let exit_actual = exit_server.local_addr().expect("exit addr");
let relay_actual = relay_server.local_addr().expect("relay addr");
// Exit task: idle.
let _exit_task = tokio::spawn(async move {
let _ = exit_server.accept().await;
});
// Relay task: just accept and keep the connection alive WITHOUT running the rendezvous. This
// models a v2 server that does not know about `ExtendBridge`. The client's incoming
// `ExtendBridge` envelope is just an opaque payload from the server's perspective.
let relay_task = tokio::spawn(async move {
let conn = relay_server.accept().await.expect("relay accept");
// Hold the connection until the test ends.
tokio::time::sleep(Duration::from_secs(20)).await;
drop(conn);
});
tokio::time::sleep(Duration::from_millis(20)).await;
// The client must time out waiting for CircuitReady.
let res = tokio::time::timeout(
Duration::from_secs(20),
circuit::dial_circuit_with_relay_name(
&[relay_actual, exit_actual],
client_proto,
UdpOpts::default(),
Some(RELAY_SAN),
),
)
.await
.expect("dial_circuit returned within 20s");
let err = match res {
Ok(_) => panic!("dial_circuit must fail when the relay never sends CircuitReady"),
Err(e) => e,
};
let msg = format!("{err:#}");
assert!(
msg.contains("timeout") || msg.contains("CircuitReady"),
"expected timeout / CircuitReady in error, got: {msg}"
);
relay_task.abort();
}