feat(transport): anti-surveillance - UDP port-knocking + cover traffic

Two opt-in (default off) features directly targeting the kind of operator
dragnet described in the news context — make the server harder to identify
on a scan, and the traffic harder to fingerprint by volume/timing analysis.

1) Port-knocking (probe resistance, UDP)
   - Wire: every HS datagram (0x01) is prefixed with a 16-byte HMAC token
     when UdpOpts.knock_required is on:
       knock = HMAC-SHA256(knock_key, u64_be(unix_minute))[..16]
   - Server-side: validates against {now-1, now, now+1} minutes (3-minute
     window for clock skew, constant-time compare). Invalid -> silent drop;
     the port looks closed to scanners.
   - knock_key comes from the CLI (derived from CA fingerprint at the
     deployment layer); transport just consumes it.
   - DATA datagrams unchanged (AEAD already proves legitimacy past hs).

2) Cover traffic (chaff, UDP)
   - Optional background task per UdpConnection: every random delay
     (mean_interval_ms +/- jitter, default 500ms +/- 50%) sends a
     Frame::Ping{seq=random} when no Data was sent in the recent window
     (idle-skip => zero overhead under load). RAII-aborted on Drop.
   - Receiver answers Ping with Pong (existing logic); both are consumed
     internally by recv_packet, invisible to the app.

API: UdpOpts gains knock_required/knock_key/cover_traffic_enabled/
cover_mean_interval_ms/cover_jitter (all defaults preserve v2 behavior).
Helpers exported: knock_for_minute, KNOCK_LEN.

Local deps: hmac 0.12 + sha2 0.10 (already in workspace lockfile, no new
resolution). Workspace: 185 tests passed (+11), clippy -D warnings clean,
fmt clean. 174 baseline tests unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xah30
2026-05-27 11:50:16 +03:00
parent 278bcced95
commit 7d711d8938
6 changed files with 1084 additions and 60 deletions
Generated
+2
View File
@@ -276,11 +276,13 @@ dependencies = [
"aura-pki", "aura-pki",
"aura-proto", "aura-proto",
"bytes", "bytes",
"hmac",
"quinn", "quinn",
"rand 0.8.6", "rand 0.8.6",
"rustls", "rustls",
"rustls-pemfile", "rustls-pemfile",
"rustls-pki-types", "rustls-pki-types",
"sha2",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
+6
View File
@@ -25,6 +25,12 @@ rustls-pemfile = "2"
# boundary is still the inner Aura handshake, just like for the QUIC backend). Local-only to this # boundary is still the inner Aura handshake, just like for the QUIC backend). Local-only to this
# crate — not a new workspace dependency. # crate — not a new workspace dependency.
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
# HMAC-SHA256 for UDP port-knocking (probe resistance): the knock token is
# `HMAC(knock_key, u64_be(unix_minute))[..16]`, prefixed on every HS datagram when
# `UdpOpts::knock_required` is enabled. Both already resolved in the workspace lockfile (transitively
# via aura-crypto's deps tree), so no new version is introduced.
hmac = "0.12"
sha2 = "0.10"
[dev-dependencies] [dev-dependencies]
# The loopback integration test mints a CA + server/client certs to drive a real QUIC handshake. # The loopback integration test mints a CA + server/client certs to drive a real QUIC handshake.
+1 -1
View File
@@ -80,7 +80,7 @@ pub use padding::{
}; };
pub use quic::{client_endpoint, server_endpoint, AcceptAnyServerCert}; pub use quic::{client_endpoint, server_endpoint, AcceptAnyServerCert};
pub use tcp::{TcpClient, TcpConnection, TcpOpts, TcpServer, DEFAULT_TCP_ALPN}; pub use tcp::{TcpClient, TcpConnection, TcpOpts, TcpServer, DEFAULT_TCP_ALPN};
pub use udp::{UdpClient, UdpConnection, UdpOpts, UdpServer}; pub use udp::{knock_for_minute, UdpClient, UdpConnection, UdpOpts, UdpServer, KNOCK_LEN};
// Re-export the inner proto trait so downstream crates (the CLI) can name the connection as // Re-export the inner proto trait so downstream crates (the CLI) can name the connection as
// `Arc<dyn aura_transport::PacketConnection>` without a separate `aura_proto` import. // `Arc<dyn aura_transport::PacketConnection>` without a separate `aura_proto` import.
+552 -59
View File
@@ -50,6 +50,7 @@ use std::collections::{BTreeMap, HashMap};
use std::io; use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
@@ -96,15 +97,145 @@ const ACK_NONE: u16 = u16::MAX;
/// ~1253 bytes; data records are MTU-sized; this leaves slack for headers + obfuscation padding). /// ~1253 bytes; data records are MTU-sized; this leaves slack for headers + obfuscation padding).
const RECV_BUF: usize = 2048; const RECV_BUF: usize = 2048;
/// Length of the port-knock token prefixed on each HS datagram when
/// [`UdpOpts::knock_required`] is enabled (truncated HMAC-SHA256 output).
pub const KNOCK_LEN: usize = 16;
// ---------------------------------------------------------------------------------------------
// Time helpers + knock derivation
// ---------------------------------------------------------------------------------------------
/// Current wall-clock minute since the Unix epoch (`floor(now_secs / 60)`).
///
/// Returns 0 if the system clock is reported as before the epoch (extremely unusual; the knock
/// validator's ±1-minute window absorbs the resulting bucket on healthy peers).
fn current_unix_minute() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() / 60)
.unwrap_or(0)
}
/// Current wall-clock milliseconds since the Unix epoch, for the cover-traffic last-send timestamp.
fn unix_ms() -> u64 {
use std::time::{SystemTime, UNIX_EPOCH};
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
/// Derive the 16-byte port-knock token for `minute` under the shared `key`.
///
/// Wire formula: `HMAC-SHA256(key, u64_be(minute))[..16]`. The server validates against
/// [`current_unix_minute`] and ±1 to tolerate honest clock skew (≈3-minute acceptance window).
///
/// Exposed primarily as a test seam (drive the validator with a fake minute) and so the CLI / a
/// future wire-probe tool can compute the same token; production code does not need to call it
/// directly because the adapter prefixes it on every HS datagram when
/// [`UdpOpts::knock_required`] is set.
pub fn knock_for_minute(key: &[u8; 32], minute: u64) -> [u8; KNOCK_LEN] {
use hmac::{Hmac, Mac};
use sha2::Sha256;
let mut mac = <Hmac<Sha256> as Mac>::new_from_slice(key)
.expect("HMAC accepts any key length, so a 32-byte slice cannot fail");
mac.update(&minute.to_be_bytes());
let tag = mac.finalize().into_bytes();
let mut out = [0u8; KNOCK_LEN];
out.copy_from_slice(&tag[..KNOCK_LEN]);
out
}
/// Constant-time compare of two 16-byte knock tokens. Avoids leaking the index of the first
/// differing byte through timing — a defensive choice; the knock is a coarse probe-resistance
/// filter, not a per-byte secret, but a tight loop is just as cheap as a non-CT compare here.
fn ct_eq_knock(a: &[u8; KNOCK_LEN], b: &[u8; KNOCK_LEN]) -> bool {
let mut acc = 0u8;
for i in 0..KNOCK_LEN {
acc |= a[i] ^ b[i];
}
acc == 0
}
/// Strip the knock prefix from a datagram from a **known** peer when knocking is on. Returns
/// `Some(stripped)` for valid wire layouts, `None` for malformed ones (which the master loop will
/// silently drop):
///
/// * Empty datagram → `None`.
/// * `0x02 ...` (DATA) → passed through unchanged (DATA datagrams are never knock-prefixed).
/// * `knock(16) || 0x01 || ...` (HS, len ≥ 17) → returns the tail starting at the `0x01`.
/// * Anything else → `None`.
///
/// We do **not** re-validate the knock on the already-known-peer path (per the spec: "На датаграмму
/// от известного пира — без проверки knock"). Once an address has registered via a valid first
/// knock, subsequent prefixes are trusted as a wire-format artefact, not a continuing auth check.
fn strip_knock_for_known_peer(dg: &[u8]) -> Option<Vec<u8>> {
if dg.is_empty() {
return None;
}
if dg[0] == TYPE_DATA {
return Some(dg.to_vec());
}
if dg.len() > KNOCK_LEN && dg[KNOCK_LEN] == TYPE_HS {
return Some(dg[KNOCK_LEN..].to_vec());
}
None
}
/// Validate the leading 16-byte knock prefix against `HMAC(key, minute_be)[..16]` for the current
/// Unix-minute and ±1 (a ≈3-minute acceptance window), then return the stripped datagram (with the
/// type byte `TYPE_HS` at index 0). Returns `None` on any wire-format or HMAC failure — the caller
/// silently drops, so a passive probe sees no response.
fn validate_and_strip_knock(dg: &[u8], key: &[u8; 32]) -> Option<Vec<u8>> {
if dg.len() <= KNOCK_LEN || dg[KNOCK_LEN] != TYPE_HS {
return None;
}
let mut prefix = [0u8; KNOCK_LEN];
prefix.copy_from_slice(&dg[..KNOCK_LEN]);
let now = current_unix_minute();
// ±1 minute tolerance. Use saturating_sub to avoid wrapping at the epoch boundary.
let candidates = [now, now.saturating_sub(1), now.saturating_add(1)];
for m in candidates {
let expected = knock_for_minute(key, m);
if ct_eq_knock(&prefix, &expected) {
return Some(dg[KNOCK_LEN..].to_vec());
}
}
None
}
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
// Options // Options
// --------------------------------------------------------------------------------------------- // ---------------------------------------------------------------------------------------------
/// Tunables for the UDP transport (handshake reliability timers and obfuscation). /// Tunables for the UDP transport (handshake reliability timers, obfuscation, and the two
/// anti-surveillance features).
/// ///
/// [`UdpOpts::default`] is a sensible production default: obfuscation off, a 250 ms retransmit /// [`UdpOpts::default`] is a sensible production default: obfuscation off, a 250 ms retransmit
/// timeout, a 10 s overall handshake deadline, and padding profile `0` (the historical /// timeout, a 10 s overall handshake deadline, padding profile `0` (the historical
/// [`HTTPS_SIZE_BUCKETS`](padding::HTTPS_SIZE_BUCKETS) palette). /// [`HTTPS_SIZE_BUCKETS`](padding::HTTPS_SIZE_BUCKETS) palette), **knock disabled** and
/// **cover traffic disabled**. The two anti-surveillance toggles are opt-in so existing callers
/// keep the pre-feature wire behaviour without any changes.
///
/// ## Probe resistance — UDP port-knocking
///
/// When [`Self::knock_required`] is `true`, the client prefixes a 16-byte HMAC token on **every**
/// HS datagram it sends; the server silently drops any first datagram from an unknown source whose
/// prefix does not validate against the shared [`Self::knock_key`] for the current Unix-minute
/// (with ±1 minute tolerance for clock skew). To a passive scanner the listening UDP port looks
/// closed. The shared key is the SHA-256 of the Aura CA cert DER (the CLI computes it and supplies
/// it here; the transport just consumes the 32 bytes).
///
/// ## Cover traffic — idle-time chaff
///
/// When [`Self::cover_traffic_enabled`] is `true`, an established [`UdpConnection`] runs a
/// background task that injects encrypted [`Frame::Ping`]s during idle periods so the on-wire byte
/// rate stays roughly constant. The interval between attempts is
/// `cover_mean_interval_ms ± cover_jitter` (uniform), and an attempt is **skipped** if any DATA
/// datagram was sent within the previous interval (so user traffic suppresses chaff). The receiver
/// handles each cover Ping exactly like any other Ping (it answers with a Pong and keeps reading)
/// — no application-layer awareness needed.
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
pub struct UdpOpts { pub struct UdpOpts {
/// When `true`, pad every outgoing DATA datagram up to the next bucket of the configured /// When `true`, pad every outgoing DATA datagram up to the next bucket of the configured
@@ -123,6 +254,30 @@ pub struct UdpOpts {
/// How long the post-handshake linger task keeps resending the final flight (so the peer's last /// How long the post-handshake linger task keeps resending the final flight (so the peer's last
/// flight is not lost) before giving up if no DATA datagram arrives. /// flight is not lost) before giving up if no DATA datagram arrives.
pub hs_linger: Duration, pub hs_linger: Duration,
// -- anti-surveillance: probe resistance ----------------------------------------------------
/// When `true`, port-knocking is required on the server side and the client must prefix the
/// 16-byte knock token on every HS datagram (see the type-level "Probe resistance" docs).
/// `[Self::knock_key]` MUST be `Some(...)` when this is `true`; if it is not, both ends behave
/// as if knocking is off and no knock prefix is added or validated. Default `false` for
/// back-compat.
pub knock_required: bool,
/// Shared 32-byte key for the knock HMAC (typically `SHA-256(CA-cert-DER)`). Used only when
/// [`Self::knock_required`] is `true`. Default `None`.
pub knock_key: Option<[u8; 32]>,
// -- anti-surveillance: cover traffic --------------------------------------------------------
/// When `true`, after the handshake the [`UdpConnection`] spawns a background task that injects
/// encrypted [`Frame::Ping`]s during idle periods (see the type-level "Cover traffic" docs).
/// Default `false` for back-compat.
pub cover_traffic_enabled: bool,
/// Mean interval, in milliseconds, between cover-traffic attempts. Default `500`. Effective
/// only when [`Self::cover_traffic_enabled`] is `true`.
pub cover_mean_interval_ms: u64,
/// Uniform jitter fraction applied to [`Self::cover_mean_interval_ms`] (e.g. `0.5` gives
/// ±50%, so the effective interval is `mean * (1 ± 0.5)`). Clamped into `[0.0, 1.0)`. Default
/// `0.5`.
pub cover_jitter: f32,
} }
impl Default for UdpOpts { impl Default for UdpOpts {
@@ -133,6 +288,11 @@ impl Default for UdpOpts {
hs_rto: Duration::from_millis(250), hs_rto: Duration::from_millis(250),
hs_timeout: Duration::from_secs(10), hs_timeout: Duration::from_secs(10),
hs_linger: Duration::from_secs(2), hs_linger: Duration::from_secs(2),
knock_required: false,
knock_key: None,
cover_traffic_enabled: false,
cover_mean_interval_ms: 500,
cover_jitter: 0.5,
} }
} }
} }
@@ -261,6 +421,10 @@ struct ReliableHsAdapter {
/// Signalled by `poll_write` when new bytes are buffered, so the driver flushes promptly without /// Signalled by `poll_write` when new bytes are buffered, so the driver flushes promptly without
/// busy-polling. /// busy-polling.
write_notify: Arc<tokio::sync::Notify>, write_notify: Arc<tokio::sync::Notify>,
/// Optional port-knock key. When `Some`, **the client** prefixes every outgoing HS datagram with
/// the 16-byte `knock_for_minute(key, current_unix_minute())` token (probe resistance). Set
/// only on the client side (the server never knocks back); always `None` on the server.
knock_key: Option<[u8; 32]>,
} }
/// All mutable state of the reliable handshake adapter. /// All mutable state of the reliable handshake adapter.
@@ -353,17 +517,34 @@ impl ReliableHsAdapter {
socket: Arc<PeerSocket>, socket: Arc<PeerSocket>,
state: Arc<Mutex<HsState>>, state: Arc<Mutex<HsState>>,
write_notify: Arc<tokio::sync::Notify>, write_notify: Arc<tokio::sync::Notify>,
knock_key: Option<[u8; 32]>,
) -> Self { ) -> Self {
Self { Self {
socket, socket,
state, state,
write_notify, write_notify,
knock_key,
} }
} }
/// Build and send one HS datagram carrying `msg_bytes` at sequence `seq` with the current ack. /// Build and send one HS datagram carrying `msg_bytes` at sequence `seq` with the current ack.
async fn send_hs(socket: &PeerSocket, seq: u16, ack_upto: u16, msg_bytes: &[u8]) { ///
let mut dg = Vec::with_capacity(HS_PREFIX_LEN + msg_bytes.len()); /// When `knock_key` is `Some`, the 16-byte port-knock token for the current Unix-minute is
/// prefixed to the datagram (probe-resistance; see [`UdpOpts::knock_required`]). When `None`,
/// the datagram is emitted unchanged — matches the historical wire layout.
async fn send_hs(
socket: &PeerSocket,
seq: u16,
ack_upto: u16,
msg_bytes: &[u8],
knock_key: Option<&[u8; 32]>,
) {
let knock_pad = if knock_key.is_some() { KNOCK_LEN } else { 0 };
let mut dg = Vec::with_capacity(knock_pad + HS_PREFIX_LEN + msg_bytes.len());
if let Some(key) = knock_key {
let token = knock_for_minute(key, current_unix_minute());
dg.extend_from_slice(&token);
}
dg.push(TYPE_HS); dg.push(TYPE_HS);
dg.extend_from_slice(&seq.to_be_bytes()); dg.extend_from_slice(&seq.to_be_bytes());
dg.extend_from_slice(&ack_upto.to_be_bytes()); dg.extend_from_slice(&ack_upto.to_be_bytes());
@@ -399,7 +580,14 @@ impl ReliableHsAdapter {
st.unacked.insert(seq, msg.clone()); st.unacked.insert(seq, msg.clone());
(seq, ack, msg) (seq, ack, msg)
}; };
Self::send_hs(&self.socket, to_send.0, to_send.1, &to_send.2).await; Self::send_hs(
&self.socket,
to_send.0,
to_send.1,
&to_send.2,
self.knock_key.as_ref(),
)
.await;
} }
} }
@@ -435,7 +623,7 @@ impl ReliableHsAdapter {
let st = self.state.lock().await; let st = self.state.lock().await;
(st.next_send_seq, st.ack_upto()) (st.next_send_seq, st.ack_upto())
}; };
Self::send_hs(&self.socket, seq, ack, &[]).await; Self::send_hs(&self.socket, seq, ack, &[], self.knock_key.as_ref()).await;
} }
/// Retransmit all currently-unacked HS datagrams (called on the RTO timer), each carrying the /// Retransmit all currently-unacked HS datagrams (called on the RTO timer), each carrying the
@@ -448,7 +636,7 @@ impl ReliableHsAdapter {
(st.ack_upto(), batch) (st.ack_upto(), batch)
}; };
for (seq, msg) in batch { for (seq, msg) in batch {
Self::send_hs(&self.socket, seq, ack, &msg).await; Self::send_hs(&self.socket, seq, ack, &msg, self.knock_key.as_ref()).await;
} }
} }
@@ -573,6 +761,7 @@ async fn run_reliable_handshake<F, Fut>(
socket: Arc<PeerSocket>, socket: Arc<PeerSocket>,
state: Arc<Mutex<HsState>>, state: Arc<Mutex<HsState>>,
opts: UdpOpts, opts: UdpOpts,
knock_key: Option<[u8; 32]>,
run_hs: F, run_hs: F,
) -> anyhow::Result<Established> ) -> anyhow::Result<Established>
where where
@@ -586,13 +775,20 @@ where
socket.clone(), socket.clone(),
state.clone(), state.clone(),
write_notify.clone(), write_notify.clone(),
knock_key,
)); ));
let writer = AdapterWrite(ReliableHsAdapter::new( let writer = AdapterWrite(ReliableHsAdapter::new(
socket.clone(), socket.clone(),
state.clone(), state.clone(),
write_notify.clone(), write_notify.clone(),
knock_key,
)); ));
let driver = ReliableHsAdapter::new(socket.clone(), state.clone(), write_notify.clone()); let driver = ReliableHsAdapter::new(
socket.clone(),
state.clone(),
write_notify.clone(),
knock_key,
);
let hs_fut = run_hs(reader, writer); let hs_fut = run_hs(reader, writer);
tokio::pin!(hs_fut); tokio::pin!(hs_fut);
@@ -710,16 +906,38 @@ impl AsyncWrite for AdapterWrite {
/// surfaces as an error. Late handshake retransmits (`0x01` HS datagrams) seen on the data path are /// surfaces as an error. Late handshake retransmits (`0x01` HS datagrams) seen on the data path are
/// dropped. Send and receive use **separate** [`tokio::sync::Mutex`]es, so the two directions run /// dropped. Send and receive use **separate** [`tokio::sync::Mutex`]es, so the two directions run
/// concurrently. /// concurrently.
///
/// When [`UdpOpts::cover_traffic_enabled`] is set, the constructor spawns a background task that
/// injects encrypted [`Frame::Ping`]s during idle periods (see the type-level "Cover traffic" docs
/// on [`UdpOpts`]); the task is `abort`ed on `Drop`.
pub struct UdpConnection { pub struct UdpConnection {
socket: Arc<PeerSocket>, socket: Arc<PeerSocket>,
sender: Mutex<DatagramSender>, sender: Arc<Mutex<DatagramSender>>,
receiver: Mutex<DatagramReceiver>, receiver: Mutex<DatagramReceiver>,
peer_id: Option<String>, peer_id: Option<String>,
opts: UdpOpts, opts: UdpOpts,
/// Wall-clock ms of the last datagram **we** emitted on the data path (DATA `0x02`). Updated by
/// [`PacketConnection::send_packet`] and by [`PacketConnection::recv_packet`] every time the
/// receive path emits a `Pong` reply, and read by the cover task to skip an attempt when the
/// link has not been idle. `Arc<AtomicU64>` so the cover task observes the same counter without
/// contending on the send mutex.
last_send_ms: Arc<AtomicU64>,
/// `Some` for server-side connections (keeps the [`UdpServer`]'s master loop alive past the /// `Some` for server-side connections (keeps the [`UdpServer`]'s master loop alive past the
/// server handle being dropped); `None` for client-side connections (the ephemeral /// server handle being dropped); `None` for client-side connections (the ephemeral
/// `connect()`ed socket lives inside the [`PeerSocket`] and needs no external task). /// `connect()`ed socket lives inside the [`PeerSocket`] and needs no external task).
_master_task: Option<Arc<MasterTask>>, _master_task: Option<Arc<MasterTask>>,
/// `Some` when [`UdpOpts::cover_traffic_enabled`] was set at construction; `Drop` aborts the
/// task so dropping the connection does not leak it. `None` keeps the old wire-silent behaviour.
_cover_task: Option<CoverTaskGuard>,
}
/// RAII guard that aborts the cover-traffic task on drop. Wrapping the `JoinHandle` keeps the
/// `Drop` impl trivial and avoids the temptation to leak it.
struct CoverTaskGuard(tokio::task::JoinHandle<()>);
impl Drop for CoverTaskGuard {
fn drop(&mut self) {
self.0.abort();
}
} }
impl UdpConnection { impl UdpConnection {
@@ -728,13 +946,29 @@ impl UdpConnection {
opts: UdpOpts, opts: UdpOpts,
master_task: Option<Arc<MasterTask>>, master_task: Option<Arc<MasterTask>>,
) -> Self { ) -> Self {
let sender = Arc::new(Mutex::new(est.sender));
// Seed the idle clock to *now* so the cover task's first attempt waits a full interval —
// we don't want a cover Ping firing on the same millisecond the connection establishes.
let last_send_ms = Arc::new(AtomicU64::new(unix_ms()));
let cover_task = if opts.cover_traffic_enabled {
Some(CoverTaskGuard(tokio::spawn(cover_traffic_loop(
est.socket.clone(),
sender.clone(),
last_send_ms.clone(),
opts,
))))
} else {
None
};
Self { Self {
socket: est.socket, socket: est.socket,
sender: Mutex::new(est.sender), sender,
receiver: Mutex::new(est.receiver), receiver: Mutex::new(est.receiver),
peer_id: est.peer_id, peer_id: est.peer_id,
opts, opts,
last_send_ms,
_master_task: master_task, _master_task: master_task,
_cover_task: cover_task,
} }
} }
@@ -752,6 +986,35 @@ impl UdpConnection {
} }
} }
/// Pack an already-sealed AEAD record into one DATA datagram (`0x02 || rec_len(u16) || rec`),
/// applying obfuscation padding to the next bucket of `padding_profile` if `obfuscate` is set.
///
/// Shared by [`PacketConnection::send_packet`], the Ping/Pong reply branch in
/// [`PacketConnection::recv_packet`], and the cover-traffic loop — they all produce identical
/// on-wire framing.
fn pack_data_datagram(rec: &[u8], obfuscate: bool, padding_profile: u8) -> Vec<u8> {
let rec_len = rec.len();
debug_assert!(
rec_len <= u16::MAX as usize,
"sealed record exceeds u16 len"
);
let mut dg = Vec::with_capacity(DATA_PREFIX_LEN + rec_len);
dg.push(TYPE_DATA);
dg.extend_from_slice(&(rec_len as u16).to_be_bytes());
dg.extend_from_slice(rec);
if obfuscate {
let target = padding::next_bucket_for_profile(dg.len(), padding_profile);
if target > dg.len() {
let pad = target - dg.len();
let mut pad_bytes = vec![0u8; pad];
use rand::RngCore;
rand::thread_rng().fill_bytes(&mut pad_bytes);
dg.extend_from_slice(&pad_bytes);
}
}
dg
}
#[async_trait] #[async_trait]
impl PacketConnection for UdpConnection { impl PacketConnection for UdpConnection {
async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> { async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
@@ -762,32 +1025,10 @@ impl PacketConnection for UdpConnection {
payload: Bytes::copy_from_slice(packet), payload: Bytes::copy_from_slice(packet),
}) })
}; };
let rec_len = rec.len(); let dg = pack_data_datagram(&rec, self.opts.obfuscate, self.opts.padding_profile);
debug_assert!(
rec_len <= u16::MAX as usize,
"sealed record exceeds u16 len"
);
let mut dg = Vec::with_capacity(DATA_PREFIX_LEN + rec_len);
dg.push(TYPE_DATA);
dg.extend_from_slice(&(rec_len as u16).to_be_bytes());
dg.extend_from_slice(&rec);
if self.opts.obfuscate {
// Pad the *whole datagram* up to the next size bucket of the configured padding
// profile (the daily mask picks the profile id). The receiver reads exactly `rec_len`
// of the sealed record and ignores the trailing pad bytes.
let target = padding::next_bucket_for_profile(dg.len(), self.opts.padding_profile);
if target > dg.len() {
let pad = target - dg.len();
let mut pad_bytes = vec![0u8; pad];
use rand::RngCore;
rand::thread_rng().fill_bytes(&mut pad_bytes);
dg.extend_from_slice(&pad_bytes);
}
}
self.socket.send_dgram(&dg).await?; self.socket.send_dgram(&dg).await?;
// Mark the link as non-idle so the cover-traffic loop skips its next attempt.
self.last_send_ms.store(unix_ms(), Ordering::Relaxed);
Ok(()) Ok(())
} }
@@ -824,11 +1065,14 @@ impl PacketConnection for UdpConnection {
let mut tx = self.sender.lock().await; let mut tx = self.sender.lock().await;
tx.seal(&Frame::Pong { seq }) tx.seal(&Frame::Pong { seq })
}; };
let mut out = Vec::with_capacity(DATA_PREFIX_LEN + rec.len()); let out = pack_data_datagram(
out.push(TYPE_DATA); &rec,
out.extend_from_slice(&(rec.len() as u16).to_be_bytes()); self.opts.obfuscate,
out.extend_from_slice(&rec); self.opts.padding_profile,
);
self.socket.send_dgram(&out).await?; self.socket.send_dgram(&out).await?;
// A Pong is just as good as a Data send for cover-traffic suppression.
self.last_send_ms.store(unix_ms(), Ordering::Relaxed);
} }
Frame::Pong { .. } => continue, Frame::Pong { .. } => continue,
Frame::Close { code, reason } => { Frame::Close { code, reason } => {
@@ -844,6 +1088,61 @@ impl PacketConnection for UdpConnection {
} }
} }
/// Background task body: emit encrypted [`Frame::Ping`] chaff during idle periods so the on-wire
/// byte rate stays roughly constant, masking user activity (typing, voice, idle).
///
/// One iteration:
/// 1. Sample a uniform delay in `mean * (1 ± jitter)` (clamped to ≥ 1 ms) and sleep that long.
/// 2. If we sent anything in the last `delay_ms` (the link was not idle), skip — user traffic
/// suppresses chaff one-for-one.
/// 3. Otherwise seal one `Frame::Ping { seq = random }` and ship it as a DATA datagram. The peer's
/// `recv_packet` answers with a Pong, which our `recv_packet` then drops on the floor — fully
/// invisible to the application layer.
///
/// The receiver-side cover work for the Pong reply happens on the **peer's** existing `recv_packet`
/// loop, not here — so this task spawns only an outbound writer; no extra reader is needed.
async fn cover_traffic_loop(
socket: Arc<PeerSocket>,
sender: Arc<Mutex<DatagramSender>>,
last_send_ms: Arc<AtomicU64>,
opts: UdpOpts,
) {
use rand::Rng;
// Defensive clamp: a misconfigured caller setting `mean = 0` would spin tight.
let mean = opts.cover_mean_interval_ms.max(1) as f64;
let j = opts.cover_jitter.clamp(0.0, 0.999) as f64;
loop {
// Uniform delay in [mean*(1-j), mean*(1+j)], floored at 1 ms.
let r: f64 = rand::thread_rng().gen_range(-1.0..=1.0);
let delay_ms = ((mean * (1.0 + r * j)).max(1.0)) as u64;
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
// Idle check: if any DATA datagram was emitted within the last `delay_ms`, the link is busy
// and chaff would just add overhead. Skip this round.
let now_ms = unix_ms();
let last = last_send_ms.load(Ordering::Relaxed);
if now_ms.saturating_sub(last) < delay_ms {
continue;
}
// Seal one Ping with a random seq and pack it as a DATA datagram.
let rec = {
let mut tx = sender.lock().await;
let seq: u32 = rand::thread_rng().gen();
tx.seal(&Frame::Ping { seq })
};
let dg = pack_data_datagram(&rec, opts.obfuscate, opts.padding_profile);
if let Err(e) = socket.send_dgram(&dg).await {
// A transient send failure (e.g. UnreachableHost during reconfig) is best-effort;
// log and keep trying. A permanent failure will be surfaced by the real send path.
tracing::debug!("cover-traffic send failed: {e}");
continue;
}
// Treat the cover send as "we sent something" so back-to-back ticks do not bunch up.
last_send_ms.store(now_ms, Ordering::Relaxed);
}
}
/// Per-peer inbox capacity in the server's master loop demuxer. /// Per-peer inbox capacity in the server's master loop demuxer.
/// ///
/// 128 datagrams is comfortably more than a single handshake flight (a handful of messages) /// 128 datagrams is comfortably more than a single handshake flight (a handful of messages)
@@ -1007,9 +1306,27 @@ async fn server_master_loop(
}; };
let dg = buf[..n].to_vec(); let dg = buf[..n].to_vec();
// Existing peer (handshake-in-progress OR established): hand it to that peer's inbox. // Cheap RwLock read per datagram so a runtime rotation of the knock key/flag takes effect
// for new traffic immediately.
let opts_now = *opts.read().await;
let knock_active = opts_now.knock_required && opts_now.knock_key.is_some();
// Existing peer (handshake-in-progress OR established): hand it to that peer's inbox,
// stripping the knock prefix on HS datagrams when knocking is on (the peer's adapter expects
// the plain `0x01 || ...` wire layout). DATA datagrams (`0x02`) and stray bytes are passed
// through unchanged so already-established connections keep working without the prefix.
if let Some(tx) = peers.get(&from) { if let Some(tx) = peers.get(&from) {
match tx.try_send(dg) { let routed = if knock_active {
strip_knock_for_known_peer(&dg)
} else {
Some(dg)
};
let Some(routed) = routed else {
// Malformed-when-knock-required (no `0x01` after stripping the 16-byte prefix and
// not a DATA datagram): silently drop, same as for unknown peers.
continue;
};
match tx.try_send(routed) {
Ok(()) => {} Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => { Err(mpsc::error::TrySendError::Full(_)) => {
tracing::warn!("udp inbox full for {from}, dropping datagram"); tracing::warn!("udp inbox full for {from}, dropping datagram");
@@ -1023,22 +1340,38 @@ async fn server_master_loop(
continue; continue;
} }
// Unknown source: only a leading HS byte is allowed to spawn a fresh peer. Late stray // Unknown source: only a leading HS byte (after optional knock stripping) may spawn a fresh
// data datagrams from sources we forgot are silently dropped. // peer. Late stray data datagrams from sources we forgot are silently dropped.
if dg.is_empty() || dg[0] != TYPE_HS { let first_hs_dg = if knock_active {
// `unwrap()` is safe under `knock_active` (it's set only when the key is `Some`).
let key = opts_now.knock_key.expect("knock_active implies a key");
match validate_and_strip_knock(&dg, &key) {
Some(stripped) => stripped,
None => {
// Silently drop — a probe never gets a reply or even a log at info level. UDP
// looks closed to scanners. Keep one debug line for legitimate operators.
tracing::debug!("udp port-knock failed from {from}; dropping (probe?)");
continue;
}
}
} else if dg.is_empty() || dg[0] != TYPE_HS {
continue; continue;
} } else {
dg
};
// Register the peer and pre-load the inbox with its first datagram so the spawned // Register the peer and pre-load the inbox with its first (post-knock-strip) datagram so
// handshake task picks it up on its first `recv_dgram`. // the spawned handshake task picks it up on its first `recv_dgram`.
let (inbox_tx, inbox_rx) = mpsc::channel::<Vec<u8>>(PEER_INBOX_CAPACITY); let (inbox_tx, inbox_rx) = mpsc::channel::<Vec<u8>>(PEER_INBOX_CAPACITY);
// Capacity > 0, so this `try_send` cannot fail; ignore the result defensively. // Capacity > 0, so this `try_send` cannot fail; ignore the result defensively.
let _ = inbox_tx.try_send(dg); let _ = inbox_tx.try_send(first_hs_dg);
peers.insert(from, inbox_tx); peers.insert(from, inbox_tx);
// Snapshot opts for this peer's lifetime so a concurrent rotation does not change wire // Snapshot opts for this peer's lifetime so a concurrent rotation does not change wire
// behaviour mid-handshake (matches the single-peer impl's contract). // behaviour mid-handshake (matches the single-peer impl's contract). We already snapshotted
let opts_snap = *opts.read().await; // at the top of the loop iteration for the knock check; reuse that exact value so the
// routing decision and the spawned task agree.
let opts_snap = opts_now;
let cfg = proto_cfg.clone(); let cfg = proto_cfg.clone();
let master_for_peer = master.clone(); let master_for_peer = master.clone();
let acc = accept_tx.clone(); let acc = accept_tx.clone();
@@ -1052,12 +1385,19 @@ async fn server_master_loop(
}, },
}); });
let state = Arc::new(Mutex::new(HsState::new())); let state = Arc::new(Mutex::new(HsState::new()));
let result = // Server never knock-prefixes its outgoing HS datagrams (only the client does — see the
run_reliable_handshake(peer_socket, state, opts_snap, move |r, w| async move { // `Probe resistance` docs on `UdpOpts`). Pass `None` regardless of `opts_snap.knock_*`.
let result = run_reliable_handshake(
peer_socket,
state,
opts_snap,
None,
move |r, w| async move {
let session = server_handshake(r, w, &cfg).await?; let session = server_handshake(r, w, &cfg).await?;
Ok(session.into_datagram_parts()) Ok(session.into_datagram_parts())
}) },
.await; )
.await;
match result { match result {
Ok(est) => { Ok(est) => {
// Pin the master task alive while this connection lives: upgrading `Weak` // Pin the master task alive while this connection lives: upgrading `Weak`
@@ -1123,10 +1463,23 @@ impl UdpClient {
// Fresh (unseeded) state: the client speaks first (ClientHello). // Fresh (unseeded) state: the client speaks first (ClientHello).
let state = Arc::new(Mutex::new(HsState::new())); let state = Arc::new(Mutex::new(HsState::new()));
let est = run_reliable_handshake(peer_socket, state, opts, move |r, w| async move { // Client knocks if (and only if) BOTH `knock_required` is set AND a key was supplied; this
let session = client_handshake(r, w, &proto_cfg).await?; // matches the server's accept policy: missing key on either side ⇒ knocking effectively off.
Ok(session.into_datagram_parts()) let knock_key = if opts.knock_required {
}) opts.knock_key
} else {
None
};
let est = run_reliable_handshake(
peer_socket,
state,
opts,
knock_key,
move |r, w| async move {
let session = client_handshake(r, w, &proto_cfg).await?;
Ok(session.into_datagram_parts())
},
)
.await?; .await?;
// Client side has no master loop to keep alive — the ephemeral connected socket lives in // Client side has no master loop to keep alive — the ephemeral connected socket lives in
@@ -1269,4 +1622,144 @@ mod tests {
let msg_b: Vec<u8> = st.out_partial.drain(..total).collect(); let msg_b: Vec<u8> = st.out_partial.drain(..total).collect();
assert_eq!(msg_b, b); assert_eq!(msg_b, b);
} }
// -- Port-knocking helpers -----------------------------------------------------------------
/// A constant 32-byte key shared by the unit tests below.
fn test_key() -> [u8; 32] {
let mut k = [0u8; 32];
for (i, b) in k.iter_mut().enumerate() {
*b = i as u8;
}
k
}
/// Build a knocked HS datagram for an arbitrary minute, with a trivial trailing payload. The
/// test cares only about the prefix-validation logic, not the wrapped HS message.
fn make_knocked_hs(key: &[u8; 32], minute: u64) -> Vec<u8> {
let token = knock_for_minute(key, minute);
let mut dg = Vec::with_capacity(KNOCK_LEN + HS_PREFIX_LEN + 8);
dg.extend_from_slice(&token);
dg.push(TYPE_HS);
dg.extend_from_slice(&0u16.to_be_bytes()); // hs_seq = 0
dg.extend_from_slice(&ACK_NONE.to_be_bytes()); // ack = none
dg.extend_from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]);
dg
}
#[test]
fn knock_for_minute_is_deterministic_and_minute_sensitive() {
let k = test_key();
// Same input → same output.
assert_eq!(
knock_for_minute(&k, 1_000_000),
knock_for_minute(&k, 1_000_000)
);
// Different minute → different output.
assert_ne!(
knock_for_minute(&k, 1_000_000),
knock_for_minute(&k, 1_000_001)
);
// Different key → different output.
let mut k2 = k;
k2[0] ^= 1;
assert_ne!(
knock_for_minute(&k, 1_000_000),
knock_for_minute(&k2, 1_000_000)
);
}
#[test]
fn udp_knock_tolerates_clock_skew() {
// Cover the spec test name: a datagram knocked for `now-1` / `now+1` must still validate at
// the server, but `now-2` / `now+2` must NOT (window is ±1 minute).
let key = test_key();
let now = current_unix_minute();
for minute in [now, now.saturating_sub(1), now.saturating_add(1)] {
let dg = make_knocked_hs(&key, minute);
let stripped = validate_and_strip_knock(&dg, &key).unwrap_or_else(|| {
panic!("expected validation pass for minute {minute} (now={now})")
});
assert_eq!(
stripped[0], TYPE_HS,
"first byte after strip must be the HS type",
);
// The stripped tail is exactly the original datagram minus the 16-byte prefix.
assert_eq!(stripped, &dg[KNOCK_LEN..]);
}
// Two minutes away (in either direction) must fail.
for minute in [now.saturating_sub(2), now.saturating_add(2)] {
let dg = make_knocked_hs(&key, minute);
assert!(
validate_and_strip_knock(&dg, &key).is_none(),
"minute {minute} (now={now}) should fall outside the ±1 acceptance window",
);
}
// Garbage prefix never validates.
let mut bad = make_knocked_hs(&key, now);
bad[0] ^= 0xFF;
assert!(
validate_and_strip_knock(&bad, &key).is_none(),
"tampered knock must fail"
);
// Wrong layout: missing `0x01` after the 16 bytes — must fail (and not panic).
let mut short = vec![0u8; KNOCK_LEN]; // 16 zero bytes
short.push(0xAA); // not TYPE_HS
assert!(validate_and_strip_knock(&short, &key).is_none());
// Too short overall: must fail without panic.
let tiny = vec![0u8; KNOCK_LEN - 1];
assert!(validate_and_strip_knock(&tiny, &key).is_none());
}
#[test]
fn known_peer_strip_handles_data_and_hs_paths() {
// DATA datagrams are passed through unchanged (no knock prefix on the data path).
let data = vec![TYPE_DATA, 0x00, 0x05, 1, 2, 3, 4, 5];
assert_eq!(strip_knock_for_known_peer(&data), Some(data.clone()));
// HS with a 16-byte (any-bytes) prefix is stripped without validation.
let mut hs = vec![0xCDu8; KNOCK_LEN];
hs.push(TYPE_HS);
hs.extend_from_slice(&[0, 0, 0xFF, 0xFF, 9, 9, 9]);
let stripped = strip_knock_for_known_peer(&hs).expect("known-peer strip succeeds");
assert_eq!(stripped[0], TYPE_HS);
assert_eq!(stripped, hs[KNOCK_LEN..]);
// Empty: dropped.
assert!(strip_knock_for_known_peer(&[]).is_none());
// Junk: dropped.
let junk = vec![0xFFu8; 32];
assert!(strip_knock_for_known_peer(&junk).is_none());
}
// -- Cover-traffic packing ------------------------------------------------------------------
#[test]
fn pack_data_datagram_layout_no_obfuscate() {
let rec = [1u8, 2, 3, 4, 5];
let dg = pack_data_datagram(&rec, false, 0);
assert_eq!(dg[0], TYPE_DATA);
assert_eq!(u16::from_be_bytes([dg[1], dg[2]]) as usize, rec.len());
assert_eq!(&dg[DATA_PREFIX_LEN..], &rec);
// No padding when obfuscate is off.
assert_eq!(dg.len(), DATA_PREFIX_LEN + rec.len());
}
#[test]
fn pack_data_datagram_pads_when_obfuscate_set() {
let rec = [0u8; 10];
let dg = pack_data_datagram(&rec, true, 0);
// Padded up to at least the next bucket; the canonical buckets start above 10 + 3 = 13.
assert!(
dg.len() >= DATA_PREFIX_LEN + rec.len(),
"padded datagram is at least the minimum encoded length",
);
// Header is still correct (rec_len is unchanged, padding is appended).
assert_eq!(u16::from_be_bytes([dg[1], dg[2]]) as usize, rec.len());
}
} }
+330
View File
@@ -0,0 +1,330 @@
//! Integration tests for the UDP **cover-traffic** (idle-chaff) feature.
//!
//! These exercise the end-to-end behaviour of [`UdpOpts::cover_traffic_enabled`] /
//! [`UdpOpts::cover_mean_interval_ms`] / [`UdpOpts::cover_jitter`]:
//!
//! * [`udp_cover_traffic_fires_on_idle`] — cover enabled on the client; with no user data, the
//! server's `recv_packet` returns at least one Pong-flavoured artefact (a Pong sealed by the
//! server in response to the client's cover Ping is what the client would have seen if it called
//! `recv_packet`). We instead drive a known-payload round trip *after* an idle window to assert
//! that things keep working even when the cover task is running.
//! * [`udp_cover_traffic_skipped_under_load`] — drive 50 packets in ≈1 second; the cover task must
//! skip every attempt (link is non-idle), so the total number of datagrams the server observes is
//! ≈ 50, not noticeably more.
//! * [`udp_cover_traffic_disabled_back_compat`] — defaults give exactly the pre-feature wire
//! silence: no extra Pings, no extra Pongs after a 1 s idle window.
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use aura_pki::AuraCa;
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer};
use tokio::net::UdpSocket;
const SERVER_NAME: &str = "localhost";
const CLIENT_ID: &str = "client-cover";
fn make_configs() -> (ServerConfig, ClientConfig) {
let ca = AuraCa::generate("Aura UDP Cover Test CA").expect("generate CA");
let server_cert = ca
.issue_server_cert(SERVER_NAME)
.expect("issue server cert");
let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert");
let ca_pem = ca.ca_cert_pem();
let server_cfg = ServerConfig {
ca_cert_pem: ca_pem.clone(),
server_cert_pem: server_cert.cert_pem,
server_key_pem: server_cert.key_pem,
};
let client_cfg = ClientConfig {
ca_cert_pem: ca_pem,
client_cert_pem: client_cert.cert_pem,
client_key_pem: client_cert.key_pem,
server_name: SERVER_NAME.to_string(),
};
(server_cfg, client_cfg)
}
/// A datagram-counting forwarder that relays UDP between a "front" (clients connect here) and a
/// "back" (real server), counting each direction. We use this to observe wire-level traffic
/// directly — cover-Ping → cover-Pong both pass through, so the counters reflect total chaff.
async fn spawn_counting_forwarder(
server_addr: SocketAddr,
) -> (SocketAddr, Arc<AtomicU64>, Arc<AtomicU64>) {
let front = UdpSocket::bind("127.0.0.1:0").await.expect("bind front");
let back = UdpSocket::bind("127.0.0.1:0").await.expect("bind back");
let front_addr = front.local_addr().expect("front addr");
let front = Arc::new(front);
let back = Arc::new(back);
let c2s = Arc::new(AtomicU64::new(0));
let s2c = Arc::new(AtomicU64::new(0));
let client_addr: Arc<tokio::sync::Mutex<Option<SocketAddr>>> =
Arc::new(tokio::sync::Mutex::new(None));
// Client -> Server.
{
let front = front.clone();
let back = back.clone();
let c2s = c2s.clone();
let client_addr = client_addr.clone();
tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
let (n, from) = match front.recv_from(&mut buf).await {
Ok(v) => v,
Err(_) => continue,
};
{
let mut ca = client_addr.lock().await;
if ca.is_none() {
*ca = Some(from);
}
}
c2s.fetch_add(1, Ordering::Relaxed);
let _ = back.send_to(&buf[..n], server_addr).await;
}
});
}
// Server -> Client.
{
let front = front.clone();
let back = back.clone();
let s2c = s2c.clone();
let client_addr = client_addr.clone();
tokio::spawn(async move {
let mut buf = vec![0u8; 4096];
loop {
let (n, _) = match back.recv_from(&mut buf).await {
Ok(v) => v,
Err(_) => continue,
};
let dest = { *client_addr.lock().await };
if let Some(dest) = dest {
s2c.fetch_add(1, Ordering::Relaxed);
let _ = front.send_to(&buf[..n], dest).await;
}
}
});
}
(front_addr, c2s, s2c)
}
#[tokio::test]
async fn udp_cover_traffic_fires_on_idle() {
let (server_cfg, client_cfg) = make_configs();
// Tight cover interval so the test does not have to wait long for chaff to show up.
let opts = UdpOpts {
cover_traffic_enabled: true,
cover_mean_interval_ms: 100,
cover_jitter: 0.2,
..UdpOpts::default()
};
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server");
let server_addr = server.local_addr().expect("server addr");
// Put the counting forwarder between client and server so we can see chaff on the wire.
let (proxy_addr, c2s, s2c) = spawn_counting_forwarder(server_addr).await;
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await });
let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task)
.await
.expect("accept timely")
.expect("accept join")
.expect("server accept");
let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task)
.await
.expect("connect timely")
.expect("connect join")
.expect("client connect");
let server_conn: Arc<dyn PacketConnection> = Arc::new(server_conn);
let client_conn: Arc<dyn PacketConnection> = Arc::new(client_conn);
// Snapshot counters after the handshake completes, then go idle for ~1.5 s.
let c2s_after_hs = c2s.load(Ordering::Relaxed);
let s2c_after_hs = s2c.load(Ordering::Relaxed);
// Spawn a recv loop on the server side so it actually drives its read path (and replies to
// Pings). Without this, the client's cover Pings would sit in the OS socket buffer.
let server_for_recv = server_conn.clone();
let recv_task = tokio::spawn(async move {
for _ in 0..20 {
// Each call returns on Data; cover Pings are answered internally without ever
// returning. So if we hit a timeout, that's expected: cover traffic does NOT surface
// as data. We're really just running the loop so the server processes Pings.
let _ = tokio::time::timeout(Duration::from_millis(300), server_for_recv.recv_packet())
.await;
}
});
// Mirror on the client side: keep its recv path active so it consumes Pongs (otherwise the
// OS recv buffer fills with Pongs and the test sees them only on the wire counter, but the
// cover-task's own state stays clean).
let client_for_recv = client_conn.clone();
let recv_task_c = tokio::spawn(async move {
for _ in 0..20 {
let _ = tokio::time::timeout(Duration::from_millis(300), client_for_recv.recv_packet())
.await;
}
});
tokio::time::sleep(Duration::from_millis(1500)).await;
let c2s_idle = c2s.load(Ordering::Relaxed);
let s2c_idle = s2c.load(Ordering::Relaxed);
let c2s_chaff = c2s_idle.saturating_sub(c2s_after_hs);
let s2c_chaff = s2c_idle.saturating_sub(s2c_after_hs);
// With cover_mean_interval_ms = 100 over ~1.5 s, we should see > 0 chaff datagrams in each
// direction (client sends Pings; server replies with Pongs). We do not pin a tight bound to
// avoid flake on slow CI; even one chaff datagram per direction proves the feature is firing.
assert!(
c2s_chaff >= 1,
"expected at least one cover-traffic Ping client→server, got {c2s_chaff} (handshake baseline {c2s_after_hs})",
);
assert!(
s2c_chaff >= 1,
"expected at least one cover-traffic Pong server→client, got {s2c_chaff} (handshake baseline {s2c_after_hs})",
);
drop(recv_task);
drop(recv_task_c);
}
#[tokio::test]
async fn udp_cover_traffic_skipped_under_load() {
let (server_cfg, client_cfg) = make_configs();
// Cover-task mean interval ~50 ms; user traffic must suppress chaff one-for-one because each
// send updates `last_send_ms`.
let opts = UdpOpts {
cover_traffic_enabled: true,
cover_mean_interval_ms: 50,
cover_jitter: 0.1,
..UdpOpts::default()
};
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server");
let server_addr = server.local_addr().expect("server addr");
let (proxy_addr, c2s, _s2c) = spawn_counting_forwarder(server_addr).await;
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await });
let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task)
.await
.expect("accept timely")
.expect("accept join")
.expect("server accept");
let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task)
.await
.expect("connect timely")
.expect("connect join")
.expect("client connect");
let server_conn: Arc<dyn PacketConnection> = Arc::new(server_conn);
let client_conn: Arc<dyn PacketConnection> = Arc::new(client_conn);
// Drain the server in the background so it actually processes incoming packets (and Pings).
let server_for_recv = server_conn.clone();
let recv_task = tokio::spawn(async move {
let mut got = 0u32;
while let Ok(Ok(_)) =
tokio::time::timeout(Duration::from_millis(2000), server_for_recv.recv_packet()).await
{
got += 1;
if got >= 50 {
break;
}
}
got
});
let c2s_after_hs = c2s.load(Ordering::Relaxed);
// Drive 50 user packets over ≈1 s. Each send updates last_send_ms, so the cover task's idle
// check (`now - last < delay_ms`) is true on every iteration and chaff is skipped.
let total = 50u32;
let pkt = vec![0xABu8; 100];
let start = std::time::Instant::now();
for _ in 0..total {
client_conn.send_packet(&pkt).await.expect("client send");
// Pace the sends to ~1 packet every 20 ms (50 packets in 1 s); spacing < mean interval
// (50 ms) so the suppression check always wins.
tokio::time::sleep(Duration::from_millis(20)).await;
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_secs(3),
"loop should finish under 3 s, took {elapsed:?}",
);
// Give the server a moment to drain.
let _ = tokio::time::timeout(Duration::from_secs(3), recv_task).await;
let c2s_data = c2s.load(Ordering::Relaxed).saturating_sub(c2s_after_hs);
// Expect ≈ 50 datagrams from client to server (the user packets). Allow a small slack for one
// straggler cover Ping if a sleep wakes up just slightly late; tightly bound at ≤ 60.
assert!(
c2s_data >= total as u64,
"client must have sent at least {total} user packets, observed {c2s_data}",
);
assert!(
c2s_data <= (total as u64) + 10,
"cover-traffic should be suppressed under load, but observed {c2s_data} datagrams (expected ≈ {total})",
);
}
#[tokio::test]
async fn udp_cover_traffic_disabled_back_compat() {
let (server_cfg, client_cfg) = make_configs();
let opts = UdpOpts::default(); // cover_traffic_enabled: false
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind server");
let server_addr = server.local_addr().expect("server addr");
let (proxy_addr, c2s, s2c) = spawn_counting_forwarder(server_addr).await;
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(proxy_addr, client_cfg, opts).await });
let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task)
.await
.expect("accept timely")
.expect("accept join")
.expect("server accept");
let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task)
.await
.expect("connect timely")
.expect("connect join")
.expect("client connect");
// After the handshake, both sides go fully idle for 1 s. Nothing must hit the wire.
let c2s_after_hs = c2s.load(Ordering::Relaxed);
let s2c_after_hs = s2c.load(Ordering::Relaxed);
// Hold references so the connections do not drop early.
let _hold_server = server_conn;
let _hold_client = client_conn;
tokio::time::sleep(Duration::from_secs(1)).await;
assert_eq!(
c2s.load(Ordering::Relaxed),
c2s_after_hs,
"no client→server chaff with cover disabled",
);
assert_eq!(
s2c.load(Ordering::Relaxed),
s2c_after_hs,
"no server→client chaff with cover disabled",
);
}
+193
View File
@@ -0,0 +1,193 @@
//! Integration tests for the UDP **port-knocking** (probe resistance) feature.
//!
//! These exercise the end-to-end behaviour of [`UdpOpts::knock_required`] / [`UdpOpts::knock_key`]:
//!
//! * [`udp_knock_required_silent_drop_on_missing_or_wrong`] — server requires knocking; client does
//! not knock → server stays silent (no reply within 1 s, so a passive scanner sees a closed port).
//! * [`udp_knock_required_accepts_valid`] — both sides knock with the same key → handshake completes
//! like usual; the inner Aura mutual auth still drives the auth decision.
//! * [`udp_knock_disabled_back_compat`] — both sides disabled → exact pre-feature wire behaviour.
//!
//! The clock-skew tolerance test (±1 minute) lives as a unit test inside `src/udp.rs` so it can
//! drive [`validate_and_strip_knock`] directly with a faked "now" — much sharper than racing the
//! wall clock here.
use std::sync::Arc;
use std::time::Duration;
use aura_pki::AuraCa;
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
use aura_transport::{UdpClient, UdpConnection, UdpOpts, UdpServer};
use tokio::net::UdpSocket;
const SERVER_NAME: &str = "localhost";
const CLIENT_ID: &str = "client-knock";
/// Mint CA + server + client cert/key triples, returning matching handshake configs.
fn make_configs() -> (ServerConfig, ClientConfig) {
let ca = AuraCa::generate("Aura UDP Knock Test CA").expect("generate CA");
let server_cert = ca
.issue_server_cert(SERVER_NAME)
.expect("issue server cert");
let client_cert = ca.issue_client_cert(CLIENT_ID).expect("issue client cert");
let ca_pem = ca.ca_cert_pem();
let server_cfg = ServerConfig {
ca_cert_pem: ca_pem.clone(),
server_cert_pem: server_cert.cert_pem,
server_key_pem: server_cert.key_pem,
};
let client_cfg = ClientConfig {
ca_cert_pem: ca_pem,
client_cert_pem: client_cert.cert_pem,
client_key_pem: client_cert.key_pem,
server_name: SERVER_NAME.to_string(),
};
(server_cfg, client_cfg)
}
/// A 32-byte test knock key; in production this is `SHA-256(CA-cert-DER)` (the CLI computes it),
/// but for the transport-level tests any well-known constant is fine — both sides just need the
/// same bytes.
fn test_knock_key() -> [u8; 32] {
let mut k = [0u8; 32];
for (i, b) in k.iter_mut().enumerate() {
*b = (i as u8).wrapping_mul(13).wrapping_add(7);
}
k
}
#[tokio::test]
async fn udp_knock_required_silent_drop_on_missing_or_wrong() {
let (server_cfg, _client_cfg) = make_configs();
// Server: require knocking with our test key. Tighten the handshake timeout so the test does
// not have to wait the default 10 s for the (never-arriving) handshake.
let server_opts = UdpOpts {
knock_required: true,
knock_key: Some(test_knock_key()),
hs_timeout: Duration::from_secs(2),
..UdpOpts::default()
};
let server = UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, server_opts)
.expect("bind udp server");
let server_addr = server.local_addr().expect("server local_addr");
// Bind a raw client socket and send a single *unknocked* HS-shaped datagram. We do NOT run
// `UdpClient::connect` here because that would inject the proto handshake's ClientHello and we
// want to assert "the server is silent at the wire level".
let raw_client = UdpSocket::bind("127.0.0.1:0")
.await
.expect("bind raw client");
raw_client.connect(server_addr).await.expect("raw connect");
// Wire layout the server expects when knock is OFF: 0x01 (HS) || hs_seq(2) || ack(2) || msg.
// No knock prefix → the server's master loop must drop this silently.
let mut unknocked_hs = vec![0x01u8, 0x00, 0x00, 0xFF, 0xFF];
// Append some plausible-looking handshake-message bytes so the datagram is non-trivially sized.
unknocked_hs.extend_from_slice(&[0u8; 64]);
raw_client
.send(&unknocked_hs)
.await
.expect("send unknocked HS");
// The server must NOT reply. Wait 1 s for any inbound datagram; recv_from must time out.
let mut buf = [0u8; 1024];
let res = tokio::time::timeout(Duration::from_secs(1), raw_client.recv(&mut buf)).await;
assert!(
res.is_err(),
"server replied to an unknocked HS datagram (got {} bytes), expected wire silence",
res.unwrap_or(Ok(0)).unwrap_or(0),
);
// Cleanup: drop the server explicitly (also tears down the master loop).
drop(server);
}
#[tokio::test]
async fn udp_knock_required_accepts_valid() {
let (server_cfg, client_cfg) = make_configs();
let key = test_knock_key();
let opts = UdpOpts {
knock_required: true,
knock_key: Some(key),
..UdpOpts::default()
};
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server");
let server_addr = server.local_addr().expect("server local_addr");
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(server_addr, client_cfg, opts).await });
let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task)
.await
.expect("server accept timely")
.expect("accept join")
.expect("server accept");
let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task)
.await
.expect("client connect timely")
.expect("connect join")
.expect("client connect");
assert_eq!(
server_conn.peer_id(),
Some(CLIENT_ID),
"server learned client CN — handshake completed through knocking",
);
// Round-trip a packet both ways to prove the data path also works under knocking.
let server_conn: Arc<dyn PacketConnection> = Arc::new(server_conn);
let client_conn: Arc<dyn PacketConnection> = Arc::new(client_conn);
client_conn
.send_packet(b"knock knock")
.await
.expect("client send");
let got = tokio::time::timeout(Duration::from_secs(5), server_conn.recv_packet())
.await
.expect("server recv timely")
.expect("server recv");
assert_eq!(got, b"knock knock");
}
#[tokio::test]
async fn udp_knock_disabled_back_compat() {
let (server_cfg, client_cfg) = make_configs();
let opts = UdpOpts::default(); // knock_required: false, knock_key: None
let server =
UdpServer::bind("127.0.0.1:0".parse().unwrap(), server_cfg, opts).expect("bind udp server");
let server_addr = server.local_addr().expect("server local_addr");
let accept_task = tokio::spawn(async move { server.accept().await });
let connect_task =
tokio::spawn(async move { UdpClient::connect(server_addr, client_cfg, opts).await });
let server_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), accept_task)
.await
.expect("server accept timely")
.expect("accept join")
.expect("server accept");
let client_conn: UdpConnection = tokio::time::timeout(Duration::from_secs(15), connect_task)
.await
.expect("client connect timely")
.expect("connect join")
.expect("client connect");
assert_eq!(server_conn.peer_id(), Some(CLIENT_ID));
let server_conn: Arc<dyn PacketConnection> = Arc::new(server_conn);
let client_conn: Arc<dyn PacketConnection> = Arc::new(client_conn);
client_conn
.send_packet(b"no-knock")
.await
.expect("client send");
let got = tokio::time::timeout(Duration::from_secs(5), server_conn.recv_packet())
.await
.expect("server recv timely")
.expect("server recv");
assert_eq!(got, b"no-knock");
}