feat(cli): server IP pool + per-client routing (multi-client VPN concentrator)

Server now assigns each connected client an IP from a configurable pool and
maintains a client_ip -> AuraConnection map so packets read from the shared
TUN are dispatched to the right client (and each client's recv loop writes
back to the TUN). Removes v1's "single shared TUN, no NAT/pool" limitation;
turns the server into a proper multi-client VPN concentrator (paired with the
already-landed UDP multi-client demux).

- aura_cli::pool: IpPool + PoolStrategy {StaticOnly, DynamicOnly,
  StaticOrDynamic}; reserves network/broadcast/server-own IP; 15 tests.
- aura_cli::server_router: ServerRouter + ServerRoutes (Arc<RwLock<HashMap>>);
  central TUN read loop dispatching by dst_ip; spawn_inbound_forwarder per
  conn auto-unregisters and releases the IP on disconnect; 4 tests via
  MockTun + MockConn.
- aura_cli::config: [server.pool] {cidr, strategy, static} added with
  serde(default); legacy configs (only [tunnel] pool_cidr) fall back to a
  DynamicOnly pool (backward compatible, tested).
- aura_cli::server: accept loop now: pool.assign(peer_id) -> register ->
  spawn_inbound_forwarder; rejected static_only mismatches dropped+logged.
- config/server.toml.example: documented [server.pool] section.

Workspace: 141 tests passed (+24), clippy -D warnings clean, fmt clean. No
new workspace deps (async-trait added to cli dev-deps for mock traits in tests).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xah30
2026-05-27 01:41:29 +03:00
parent 4d1bdba55d
commit 0a73d5298b
9 changed files with 1702 additions and 42 deletions
+497
View File
@@ -0,0 +1,497 @@
//! Server-side per-client routing (v2 §18): one TUN, many connections, IP-pool dispatch.
//!
//! The v1 server bridged each accepted connection onto its own short-lived TUN, which only
//! supported a single active client at a time. v2 keeps **one** server-side TUN and
//! demultiplexes traffic to/from many concurrent clients using the [`IpPool`] -allocated client
//! IP as the routing key:
//!
//! * **TUN read -> conn send** (outbound, server -> peers): one task owns `tun.read_packet()`.
//! For each packet it parses the destination IP, looks up the matching client connection in the
//! `by_ip` map, and `send_packet`s the packet on that connection. Packets for unknown
//! destinations are dropped with a trace.
//! * **conn recv -> TUN write** (inbound, peers -> server): every accepted client gets a
//! dedicated task that loops on `conn.recv_packet().await` and forwards the result into a
//! single mpsc channel; one TUN-owner task drains the channel and writes packets to the TUN.
//!
//! Sharing a single `PacketIo` between the read-loop and the write-loop is awkward because both
//! halves take `&mut self`. Just like [`aura_tunnel::AuraRouter`], the router parks the TUN
//! behind one `select!` loop that handles both directions on the same task: a TUN read race
//! against a write coming off the inbound mpsc.
//!
//! The router is generic over [`aura_tunnel::PacketIo`] so tests can substitute a `MockTun` and
//! drive both halves without root.
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Arc;
use aura_proto::PacketConnection;
use aura_tunnel::router::dst_ip;
use aura_tunnel::PacketIo;
use tokio::sync::{mpsc, RwLock};
use crate::pool::IpPool;
/// Bounded capacity of the per-server inbound mpsc the per-conn tasks fan into.
///
/// Sized to absorb short bursts from many clients without making backpressure cheap. The TUN
/// owner drains continuously, so this only fills when the kernel is briefly behind on writes.
const INBOUND_CAPACITY: usize = 4096;
/// Per-client routing state shared between the registration path (accept-loop) and the data path
/// (TUN owner / per-conn tasks).
///
/// Cheap to clone (it is `Arc`-shaped internally) so the server's accept-loop and per-connection
/// tasks can all hold references to the same dispatch map.
#[derive(Clone)]
pub struct ServerRoutes {
/// Live `client_ip -> connection` map; the TUN owner reads, accept/disconnect mutate.
by_ip: Arc<RwLock<HashMap<IpAddr, Arc<dyn PacketConnection>>>>,
/// IP pool the server uses to assign and release client addresses.
pool: Arc<IpPool>,
}
impl ServerRoutes {
/// Build empty per-server routes over the given [`IpPool`].
pub fn new(pool: Arc<IpPool>) -> Self {
Self {
by_ip: Arc::new(RwLock::new(HashMap::new())),
pool,
}
}
/// Borrow the [`IpPool`] for assign/release calls.
pub fn pool(&self) -> &Arc<IpPool> {
&self.pool
}
/// Register a connection under `ip`. Returns the previously registered connection (if any),
/// which the caller should treat as a forced-disconnect race.
pub async fn register(
&self,
ip: IpAddr,
conn: Arc<dyn PacketConnection>,
) -> Option<Arc<dyn PacketConnection>> {
self.by_ip.write().await.insert(ip, conn)
}
/// Remove the connection registered under `ip`.
pub async fn unregister(&self, ip: IpAddr) {
self.by_ip.write().await.remove(&ip);
}
/// Snapshot the current dispatch map (mainly for tests / observability).
pub async fn snapshot_ips(&self) -> Vec<IpAddr> {
self.by_ip.read().await.keys().copied().collect()
}
/// Send a packet to the connection registered under `dst`. Returns `Ok(true)` if the packet
/// was queued to a connection; `Ok(false)` if no connection is registered under `dst`. Errors
/// from the connection's `send_packet` surface verbatim — the caller decides whether to
/// evict the client.
pub async fn dispatch(&self, dst: IpAddr, pkt: &[u8]) -> anyhow::Result<bool> {
let conn = {
let map = self.by_ip.read().await;
map.get(&dst).cloned()
};
match conn {
Some(c) => {
c.send_packet(pkt).await?;
Ok(true)
}
None => Ok(false),
}
}
}
/// The per-server "one TUN, many clients" router.
///
/// Owns a [`PacketIo`] device and a [`ServerRoutes`] dispatch table. [`Self::run`] drains
/// outbound TUN reads into the right connection, and inbound packets from any registered
/// connection to the single TUN.
///
/// The accept-loop drives [`Self::handle_connection`] from each `MultiServer::accept` so per-conn
/// state is created up-front and torn down when the connection ends.
pub struct ServerRouter<P: PacketIo> {
tun: P,
routes: ServerRoutes,
/// All per-conn tasks fan their inbound packets into this single channel; the owner of `tun`
/// drains the receiver.
inbound_tx: mpsc::Sender<Vec<u8>>,
inbound_rx: mpsc::Receiver<Vec<u8>>,
}
impl<P: PacketIo + 'static> ServerRouter<P> {
/// Build a fresh router with empty routes and the given pool.
pub fn new(tun: P, pool: Arc<IpPool>) -> Self {
Self::from_routes(tun, ServerRoutes::new(pool))
}
/// Build a router from an existing [`ServerRoutes`] (mainly for tests that pre-seed routes).
pub fn from_routes(tun: P, routes: ServerRoutes) -> Self {
let (inbound_tx, inbound_rx) = mpsc::channel::<Vec<u8>>(INBOUND_CAPACITY);
Self {
tun,
routes,
inbound_tx,
inbound_rx,
}
}
/// Clone the shared routes (so the accept-loop can call [`Self::spawn_connection`] from
/// outside the router task).
pub fn routes(&self) -> ServerRoutes {
self.routes.clone()
}
/// Clone the inbound sender so the accept-loop / handlers can spawn per-conn forwarder tasks
/// without holding a reference to the router.
pub fn inbound_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.inbound_tx.clone()
}
/// Spawn the per-connection inbound forwarder.
///
/// Loops `conn.recv_packet()` and forwards each packet into the shared `inbound_tx`. When the
/// connection errors out (peer closed, transport error, etc.) the function unregisters `ip`
/// from the routes and releases it back to the pool.
pub fn spawn_inbound_forwarder(
routes: ServerRoutes,
inbound_tx: mpsc::Sender<Vec<u8>>,
conn: Arc<dyn PacketConnection>,
ip: IpAddr,
peer_id: Option<String>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
match conn.recv_packet().await {
Ok(pkt) => {
if inbound_tx.send(pkt).await.is_err() {
tracing::debug!(
%ip, peer = ?peer_id,
"server router inbound channel closed; ending per-conn task"
);
break;
}
}
Err(e) => {
tracing::info!(
%ip, peer = ?peer_id, error = %e,
"client connection ended; releasing ip and unregistering"
);
break;
}
}
}
routes.unregister(ip).await;
routes.pool().release(ip).await;
})
}
/// Run the TUN owner loop: read packets from the TUN, dispatch to the right connection;
/// write packets coming in from the inbound channel to the TUN. Returns when either half
/// fails.
pub async fn run(mut self) -> anyhow::Result<()> {
loop {
tokio::select! {
read = self.tun.read_packet() => {
match read {
Ok(pkt) => {
if let Err(e) = self.route_outbound(&pkt).await {
// Connection-level errors are non-fatal for the router: log and
// keep serving other clients (the per-conn task will see its own
// error path and release the IP).
tracing::warn!(error = %e, "server router outbound send failed");
}
}
Err(e) => {
return Err(anyhow::Error::new(e).context("server TUN read failed"));
}
}
}
maybe_pkt = self.inbound_rx.recv() => {
match maybe_pkt {
Some(pkt) => {
if let Err(e) = self.tun.write_packet(&pkt).await {
return Err(anyhow::Error::new(e).context("server TUN write failed"));
}
}
None => {
// All inbound senders dropped (the accept-loop and all per-conn
// tasks). The router has no work left; exit.
return Ok(());
}
}
}
}
}
}
/// Classify one outbound packet and dispatch it to the matching client connection.
async fn route_outbound(&self, pkt: &[u8]) -> anyhow::Result<()> {
let Some(dst) = dst_ip(pkt) else {
tracing::trace!(len = pkt.len(), "dropping unparseable outbound packet");
return Ok(());
};
match self.routes.dispatch(dst, pkt).await? {
true => Ok(()),
false => {
tracing::trace!(%dst, len = pkt.len(), "no client registered for destination; dropping");
Ok(())
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::net::Ipv4Addr;
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex as TokMutex};
use crate::pool::PoolStrategy;
/// In-memory fake TUN: `read_packet` drains an injected queue, `write_packet` forwards to a
/// channel the test reads. Identical contract to the one in `aura-tunnel/tests/routes.rs`.
struct MockTun {
inbound: mpsc::Receiver<Vec<u8>>,
written: mpsc::Sender<Vec<u8>>,
}
#[async_trait]
impl PacketIo for MockTun {
async fn read_packet(&mut self) -> std::io::Result<Vec<u8>> {
match self.inbound.recv().await {
Some(pkt) => Ok(pkt),
None => Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"mock TUN closed",
)),
}
}
async fn write_packet(&mut self, packet: &[u8]) -> std::io::Result<()> {
self.written
.send(packet.to_vec())
.await
.map_err(|_| std::io::Error::new(std::io::ErrorKind::BrokenPipe, "test dropped"))
}
}
/// Mock packet connection: `send_packet` forwards to a channel; `recv_packet` drains another.
struct MockConn {
sent: mpsc::Sender<Vec<u8>>,
to_recv: TokMutex<mpsc::Receiver<Vec<u8>>>,
}
#[async_trait]
impl PacketConnection for MockConn {
async fn send_packet(&self, packet: &[u8]) -> anyhow::Result<()> {
self.sent.send(packet.to_vec()).await?;
Ok(())
}
async fn recv_packet(&self) -> anyhow::Result<Vec<u8>> {
let mut rx = self.to_recv.lock().await;
rx.recv()
.await
.ok_or_else(|| anyhow::anyhow!("mock conn closed"))
}
}
fn ipv4_packet_to(dst: Ipv4Addr) -> Vec<u8> {
let mut pkt = vec![0u8; 20];
pkt[0] = 0x45;
let o = dst.octets();
pkt[16..20].copy_from_slice(&o);
pkt
}
fn ip(s: &str) -> IpAddr {
s.parse().unwrap()
}
fn net(s: &str) -> ipnetwork::IpNetwork {
s.parse().unwrap()
}
/// End-to-end test of the server router's data path:
/// * Two MockConns are registered under .2 and .3.
/// * A TUN packet to .2 should reach conn-A; to .3 -> conn-B; to .99 -> dropped.
/// * A packet sent into conn-A's recv channel should be written verbatim to the TUN.
#[tokio::test]
async fn server_router_dispatches_by_destination_ip() {
// Build a pool with .1 reserved as the server's IP.
let pool = Arc::new(
IpPool::new(
net("10.0.0.0/29"),
PoolStrategy::DynamicOnly,
HashMap::new(),
ip("10.0.0.1"),
)
.unwrap(),
);
// Wire up the mock TUN and the router.
let (tun_in_tx, tun_in_rx) = mpsc::channel::<Vec<u8>>(8);
let (tun_out_tx, mut tun_out_rx) = mpsc::channel::<Vec<u8>>(8);
let tun = MockTun {
inbound: tun_in_rx,
written: tun_out_tx,
};
let router = ServerRouter::new(tun, pool.clone());
let routes = router.routes();
let inbound_tx = router.inbound_sender();
// Allocate two clients via the pool and register them under those IPs.
let ip_a = pool.assign("client-a").await.expect("ip-a");
let ip_b = pool.assign("client-b").await.expect("ip-b");
let (sent_a_tx, mut sent_a_rx) = mpsc::channel::<Vec<u8>>(8);
let (recv_a_tx, recv_a_rx) = mpsc::channel::<Vec<u8>>(8);
let conn_a: Arc<dyn PacketConnection> = Arc::new(MockConn {
sent: sent_a_tx,
to_recv: TokMutex::new(recv_a_rx),
});
let (sent_b_tx, mut sent_b_rx) = mpsc::channel::<Vec<u8>>(8);
let (recv_b_tx, recv_b_rx) = mpsc::channel::<Vec<u8>>(8);
let conn_b: Arc<dyn PacketConnection> = Arc::new(MockConn {
sent: sent_b_tx,
to_recv: TokMutex::new(recv_b_rx),
});
routes.register(ip_a, Arc::clone(&conn_a)).await;
routes.register(ip_b, Arc::clone(&conn_b)).await;
// Spawn per-conn forwarders and the router itself.
let _fwd_a = ServerRouter::<MockTun>::spawn_inbound_forwarder(
routes.clone(),
inbound_tx.clone(),
Arc::clone(&conn_a),
ip_a,
Some("client-a".into()),
);
let _fwd_b = ServerRouter::<MockTun>::spawn_inbound_forwarder(
routes.clone(),
inbound_tx,
Arc::clone(&conn_b),
ip_b,
Some("client-b".into()),
);
let handle = tokio::spawn(router.run());
// 1. TUN read -> dispatch to conn-A by destination = ip_a.
let pkt_to_a = ipv4_packet_to(match ip_a {
IpAddr::V4(v4) => v4,
_ => panic!("ipv4 in this test"),
});
tun_in_tx.send(pkt_to_a.clone()).await.unwrap();
let got = tokio::time::timeout(std::time::Duration::from_secs(2), sent_a_rx.recv())
.await
.expect("router did not dispatch to client-a in time")
.expect("client-a sent channel closed");
assert_eq!(got, pkt_to_a, "conn-a should receive the packet verbatim");
// 2. TUN read -> dispatch to conn-B by destination = ip_b.
let pkt_to_b = ipv4_packet_to(match ip_b {
IpAddr::V4(v4) => v4,
_ => panic!("ipv4 in this test"),
});
tun_in_tx.send(pkt_to_b.clone()).await.unwrap();
let got = tokio::time::timeout(std::time::Duration::from_secs(2), sent_b_rx.recv())
.await
.expect("router did not dispatch to client-b in time")
.expect("client-b sent channel closed");
assert_eq!(got, pkt_to_b);
// 3. TUN read -> destination .99 has nobody registered; must be silently dropped (no
// crash, nothing reaches either conn).
let pkt_to_unknown = ipv4_packet_to(Ipv4Addr::new(10, 0, 0, 99));
tun_in_tx.send(pkt_to_unknown).await.unwrap();
let res =
tokio::time::timeout(std::time::Duration::from_millis(200), sent_a_rx.recv()).await;
assert!(res.is_err(), "no packet should reach conn-a");
let res =
tokio::time::timeout(std::time::Duration::from_millis(200), sent_b_rx.recv()).await;
assert!(res.is_err(), "no packet should reach conn-b");
// 4. Inbound: conn-A's recv channel -> TUN write.
let in_pkt = ipv4_packet_to(Ipv4Addr::new(8, 8, 8, 8));
recv_a_tx.send(in_pkt.clone()).await.unwrap();
let written = tokio::time::timeout(std::time::Duration::from_secs(2), tun_out_rx.recv())
.await
.expect("router did not forward inbound to TUN in time")
.expect("TUN write channel closed");
assert_eq!(written, in_pkt);
// 5. Inbound from conn-B: TUN must also receive it (one TUN, many sources).
let in_pkt_b = ipv4_packet_to(Ipv4Addr::new(1, 1, 1, 1));
recv_b_tx.send(in_pkt_b.clone()).await.unwrap();
let written = tokio::time::timeout(std::time::Duration::from_secs(2), tun_out_rx.recv())
.await
.expect("inbound-b did not reach TUN")
.expect("TUN write channel closed");
assert_eq!(written, in_pkt_b);
// Cleanup: closing the TUN read side bubbles an EOF up the run-loop.
drop(tun_in_tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
}
/// When a per-conn task ends (e.g. the connection errors), the router must release the IP
/// back to the pool and remove the dispatch entry so a re-registration can succeed.
#[tokio::test]
async fn per_conn_task_releases_ip_and_unregisters_on_error() {
let pool = Arc::new(
IpPool::new(
net("10.0.0.0/30"), // only .2 is dynamically usable
PoolStrategy::DynamicOnly,
HashMap::new(),
ip("10.0.0.1"),
)
.unwrap(),
);
let (_tun_in_tx, tun_in_rx) = mpsc::channel::<Vec<u8>>(8);
let (tun_out_tx, _tun_out_rx) = mpsc::channel::<Vec<u8>>(8);
let tun = MockTun {
inbound: tun_in_rx,
written: tun_out_tx,
};
let router = ServerRouter::new(tun, pool.clone());
let routes = router.routes();
let inbound_tx = router.inbound_sender();
let _handle = tokio::spawn(router.run());
let ip_a = pool.assign("client-a").await.expect("only usable ip");
let (sent_tx, _sent_rx) = mpsc::channel::<Vec<u8>>(8);
let (recv_tx, recv_rx) = mpsc::channel::<Vec<u8>>(8);
let conn: Arc<dyn PacketConnection> = Arc::new(MockConn {
sent: sent_tx,
to_recv: TokMutex::new(recv_rx),
});
routes.register(ip_a, Arc::clone(&conn)).await;
let fwd = ServerRouter::<MockTun>::spawn_inbound_forwarder(
routes.clone(),
inbound_tx,
Arc::clone(&conn),
ip_a,
Some("client-a".into()),
);
// Drop the recv sender to make `conn.recv_packet()` return an error, which ends the task.
drop(recv_tx);
let _ = tokio::time::timeout(std::time::Duration::from_secs(2), fwd).await;
// The IP must be releasable now (re-assigned without exhaustion).
let next = pool.assign("client-b").await.expect("ip back in pool");
assert_eq!(next, ip_a, "released ip should be reusable");
// And the dispatch map should not still contain a stale entry under ip_a.
let snap = routes.snapshot_ips().await;
assert!(
!snap.contains(&ip_a),
"stale entry should have been unregistered"
);
}
}