feat(transport): TCP/443 fallback + unified dialer with UDP->TCP->QUIC handover
- tcp.rs: Aura proto handshake + Session directly over TcpStream (TcpServer/
TcpClient/TcpConnection: PacketConnection), with an optional light HTTP/1.1
masquerade preamble. Fallback for UDP-blocking networks. (Full TLS-443 mimicry
is a documented follow-up.)
- dial.rs: TransportMode {Udp,Tcp,Quic}, Endpoints, DialConfig; client `dial()`
tries transports in order and hands over on failure/timeout; MultiServer binds
and accepts on every enabled transport at once (TCP/QUIC multi-client; UDP
single-peer-per-accept in v1).
- Tests: tcp loopback (plain + masquerade), dial handover (dead TCP -> UDP).
clippy/fmt clean.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,297 @@
|
||||
//! Unified transport selection: a client [`dial`] that tries transports in order (UDP → TCP →
|
||||
//! QUIC, the "handover") and a [`MultiServer`] that accepts on every enabled transport at once.
|
||||
//!
|
||||
//! All three backends produce an `Arc<dyn aura_proto::PacketConnection>`, so the tunnel router does
|
||||
//! not care which transport carried a connection. The primary path is Aura's own UDP transport;
|
||||
//! TCP/443 and QUIC are fallbacks for networks that throttle or block plain UDP.
|
||||
|
||||
use std::fmt;
|
||||
use std::net::SocketAddr;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use aura_proto::{ClientConfig, PacketConnection, ServerConfig};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::{AuraClient, AuraServer, TcpClient, TcpOpts, TcpServer, UdpClient, UdpOpts, UdpServer};
|
||||
|
||||
/// Which wire transport carries an Aura connection.
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum TransportMode {
|
||||
/// Aura's own protocol over plain UDP (primary).
|
||||
Udp,
|
||||
/// Aura over TCP (fallback for UDP-blocking networks; optional HTTP masquerade).
|
||||
Tcp,
|
||||
/// Aura inside QUIC/HTTP3 mimicry (fallback / strong camouflage).
|
||||
Quic,
|
||||
}
|
||||
|
||||
impl FromStr for TransportMode {
|
||||
type Err = String;
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
match s.trim().to_ascii_lowercase().as_str() {
|
||||
"udp" => Ok(Self::Udp),
|
||||
"tcp" => Ok(Self::Tcp),
|
||||
"quic" => Ok(Self::Quic),
|
||||
other => Err(format!("unknown transport '{other}' (expected udp|tcp|quic)")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for TransportMode {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(match self {
|
||||
Self::Udp => "udp",
|
||||
Self::Tcp => "tcp",
|
||||
Self::Quic => "quic",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Per-transport server addresses. Any subset may be set; `None` means that transport is disabled.
|
||||
///
|
||||
/// The UDP transport and QUIC both use UDP and therefore must use *different* ports; TCP can share a
|
||||
/// port number with the UDP transport (different protocol).
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct Endpoints {
|
||||
/// Address of the custom-UDP transport.
|
||||
pub udp: Option<SocketAddr>,
|
||||
/// Address of the TCP transport.
|
||||
pub tcp: Option<SocketAddr>,
|
||||
/// Address of the QUIC transport.
|
||||
pub quic: Option<SocketAddr>,
|
||||
}
|
||||
|
||||
/// Client-side dial configuration: where the server is per transport, the fallback order, and
|
||||
/// per-transport options.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct DialConfig {
|
||||
/// Server addresses per transport.
|
||||
pub endpoints: Endpoints,
|
||||
/// SNI / masquerade hostname (QUIC outer SNI; TCP masquerade Host).
|
||||
pub sni: String,
|
||||
/// Transports to try, in order. The first that connects wins.
|
||||
pub order: Vec<TransportMode>,
|
||||
/// Options for the UDP transport.
|
||||
pub udp: UdpOpts,
|
||||
/// Options for the TCP transport.
|
||||
pub tcp: TcpOpts,
|
||||
/// Per-attempt timeout before moving to the next transport in `order`.
|
||||
pub attempt_timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for DialConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
endpoints: Endpoints::default(),
|
||||
sni: "cdn.example.com".to_string(),
|
||||
order: vec![TransportMode::Udp, TransportMode::Tcp, TransportMode::Quic],
|
||||
udp: UdpOpts::default(),
|
||||
tcp: TcpOpts::default(),
|
||||
attempt_timeout: Duration::from_secs(8),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Connect to the server, trying each transport in `cfg.order` until one succeeds ("handover").
|
||||
///
|
||||
/// Returns the established connection and which transport carried it. A transport with no configured
|
||||
/// address is skipped; a transport that errors or times out moves on to the next.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns the last error if every configured transport fails (or an error if none were configured).
|
||||
pub async fn dial(
|
||||
proto_cfg: ClientConfig,
|
||||
cfg: DialConfig,
|
||||
) -> anyhow::Result<(Arc<dyn PacketConnection>, TransportMode)> {
|
||||
let mut last_err: Option<anyhow::Error> = None;
|
||||
for mode in &cfg.order {
|
||||
let addr = match mode {
|
||||
TransportMode::Udp => cfg.endpoints.udp,
|
||||
TransportMode::Tcp => cfg.endpoints.tcp,
|
||||
TransportMode::Quic => cfg.endpoints.quic,
|
||||
};
|
||||
let Some(addr) = addr else {
|
||||
continue; // transport not configured
|
||||
};
|
||||
tracing::info!("dial: trying {mode} at {addr}");
|
||||
let attempt =
|
||||
tokio::time::timeout(cfg.attempt_timeout, dial_one(*mode, addr, &proto_cfg, &cfg)).await;
|
||||
match attempt {
|
||||
Ok(Ok(conn)) => {
|
||||
tracing::info!("dial: connected via {mode}");
|
||||
return Ok((conn, *mode));
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::warn!("dial: {mode} failed: {e:#}");
|
||||
last_err = Some(e);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!("dial: {mode} timed out after {:?}", cfg.attempt_timeout);
|
||||
last_err = Some(anyhow::anyhow!("{mode} connect timed out"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("no transports configured to dial")))
|
||||
}
|
||||
|
||||
async fn dial_one(
|
||||
mode: TransportMode,
|
||||
addr: SocketAddr,
|
||||
proto_cfg: &ClientConfig,
|
||||
cfg: &DialConfig,
|
||||
) -> anyhow::Result<Arc<dyn PacketConnection>> {
|
||||
Ok(match mode {
|
||||
TransportMode::Udp => UdpClient::connect(addr, proto_cfg.clone(), cfg.udp)
|
||||
.await?
|
||||
.into_dyn(),
|
||||
TransportMode::Tcp => TcpClient::connect(addr, proto_cfg.clone(), cfg.tcp.clone())
|
||||
.await?
|
||||
.into_dyn(),
|
||||
TransportMode::Quic => AuraClient::connect(addr, &cfg.sni, proto_cfg.clone())
|
||||
.await?
|
||||
.into_dyn(),
|
||||
})
|
||||
}
|
||||
|
||||
/// An accepted connection plus which transport carried it and the verified peer id.
|
||||
pub struct Accepted {
|
||||
/// The established packet pipe.
|
||||
pub conn: Arc<dyn PacketConnection>,
|
||||
/// Which transport accepted it.
|
||||
pub mode: TransportMode,
|
||||
/// Verified peer Common Name (client id), if any.
|
||||
pub peer_id: Option<String>,
|
||||
}
|
||||
|
||||
/// A server that listens on every enabled transport simultaneously and yields accepted connections
|
||||
/// from all of them through one [`MultiServer::accept`] call.
|
||||
///
|
||||
/// TCP and QUIC accept loops handle many clients. The custom-UDP backend is single-peer-per-accept
|
||||
/// in v1 (a multi-client UDP demux is a documented follow-up), so with several clients prefer TCP or
|
||||
/// QUIC, or run one UDP server per client.
|
||||
pub struct MultiServer {
|
||||
rx: mpsc::Receiver<Accepted>,
|
||||
tasks: Vec<tokio::task::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl MultiServer {
|
||||
/// Bind and start accept loops for every transport whose address is set in `endpoints`.
|
||||
/// The QUIC outer-TLS cert reuses the Aura server cert from `proto_cfg`.
|
||||
///
|
||||
/// # Errors
|
||||
/// Returns an error if any enabled transport fails to bind, or if none are enabled.
|
||||
pub async fn bind(
|
||||
endpoints: Endpoints,
|
||||
proto_cfg: ServerConfig,
|
||||
udp: UdpOpts,
|
||||
tcp: TcpOpts,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (txc, rx) = mpsc::channel::<Accepted>(32);
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
if let Some(addr) = endpoints.udp {
|
||||
let server = UdpServer::bind(addr, proto_cfg.clone(), udp)?;
|
||||
tasks.push(tokio::spawn(udp_accept_loop(server, txc.clone())));
|
||||
}
|
||||
if let Some(addr) = endpoints.tcp {
|
||||
let server = TcpServer::bind(addr, proto_cfg.clone(), tcp.clone()).await?;
|
||||
tasks.push(tokio::spawn(tcp_accept_loop(server, txc.clone())));
|
||||
}
|
||||
if let Some(addr) = endpoints.quic {
|
||||
let server = AuraServer::bind(
|
||||
addr,
|
||||
&proto_cfg.server_cert_pem,
|
||||
&proto_cfg.server_key_pem,
|
||||
proto_cfg.clone(),
|
||||
)?;
|
||||
tasks.push(tokio::spawn(quic_accept_loop(server, txc.clone())));
|
||||
}
|
||||
|
||||
if tasks.is_empty() {
|
||||
anyhow::bail!("MultiServer: no transports enabled");
|
||||
}
|
||||
Ok(Self { rx, tasks })
|
||||
}
|
||||
|
||||
/// Wait for the next accepted connection from any enabled transport. Returns `None` when all
|
||||
/// accept loops have stopped.
|
||||
pub async fn accept(&mut self) -> Option<Accepted> {
|
||||
self.rx.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MultiServer {
|
||||
fn drop(&mut self) {
|
||||
for t in &self.tasks {
|
||||
t.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn udp_accept_loop(server: UdpServer, tx: mpsc::Sender<Accepted>) {
|
||||
loop {
|
||||
match server.accept().await {
|
||||
Ok(conn) => {
|
||||
let peer_id = conn.peer_id().map(str::to_owned);
|
||||
let accepted = Accepted {
|
||||
conn: conn.into_dyn(),
|
||||
mode: TransportMode::Udp,
|
||||
peer_id,
|
||||
};
|
||||
if tx.send(accepted).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("udp accept failed: {e:#}");
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn tcp_accept_loop(server: TcpServer, tx: mpsc::Sender<Accepted>) {
|
||||
loop {
|
||||
match server.accept().await {
|
||||
Ok(conn) => {
|
||||
let peer_id = conn.peer_id().map(str::to_owned);
|
||||
let accepted = Accepted {
|
||||
conn: conn.into_dyn(),
|
||||
mode: TransportMode::Tcp,
|
||||
peer_id,
|
||||
};
|
||||
if tx.send(accepted).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("tcp accept failed: {e:#}");
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn quic_accept_loop(server: AuraServer, tx: mpsc::Sender<Accepted>) {
|
||||
loop {
|
||||
match server.accept().await {
|
||||
Ok(conn) => {
|
||||
let peer_id = conn.peer_id().map(str::to_owned);
|
||||
let accepted = Accepted {
|
||||
conn: conn.into_dyn(),
|
||||
mode: TransportMode::Quic,
|
||||
peer_id,
|
||||
};
|
||||
if tx.send(accepted).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!("quic accept failed: {e:#}");
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user