refactor(proto): add Session::split() for full-duplex data path

Compose Session from SessionSender (writer + outbound AEAD/seq) and
SessionReceiver (reader + inbound AEAD + replay window); split() hands back
the two halves so a VPN data path can run concurrent read/write tasks
(recv_frame is not cancellation-safe, so select! on one &mut Session is unsafe).
send_frame/recv_frame/peer_id/into_inner unchanged; 13 tests still green.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
xah30
2026-05-25 18:07:55 +03:00
parent bb835e4ca7
commit 5d88d57223
+122 -46
View File
@@ -3,7 +3,8 @@
//! A [`Session`] owns the transport reader + writer and the two directional [`AeadSession`]s
//! produced by the handshake. It exposes [`Session::send_frame`] / [`Session::recv_frame`], which
//! serialize a [`Frame`], AEAD-seal/open it, and ship it inside a [`MsgType::Data`] record framed
//! by the 5-byte protocol header.
//! by the 5-byte protocol header. For full-duplex use (e.g. a VPN data path) call
//! [`Session::split`] to get independent [`SessionSender`] / [`SessionReceiver`] halves.
//!
//! ## Record format and replay protection
//!
@@ -104,61 +105,22 @@ impl ReplayWindow {
}
}
/// An established, encrypted Aura session over a transport reader `R` and writer `W`.
/// The send half of a [`Session`]: owns the writer plus the outbound AEAD and sequence counter.
///
/// Created by [`crate::client_handshake`] / [`crate::server_handshake`]. Use
/// [`Session::send_frame`] and [`Session::recv_frame`] for application traffic.
pub struct Session<R, W> {
reader: R,
/// Obtained via [`Session::split`]. It can be moved into a dedicated writer task so a connection is
/// driven full-duplex (one task sending, one receiving) without sharing a single `&mut Session`.
pub struct SessionSender<W> {
writer: W,
/// AEAD this endpoint seals outgoing Data with.
send_aead: AeadSession,
/// AEAD this endpoint opens incoming Data with.
recv_aead: AeadSession,
/// Next sequence number to stamp on an outgoing Data record (mirrors `send_aead`'s counter).
send_seq: u64,
/// Replay window over incoming Data sequence numbers.
replay: ReplayWindow,
/// The verified identity (Common Name) of the peer, learned during the handshake.
peer_id: Option<String>,
}
impl<R, W> Session<R, W>
impl<W> SessionSender<W>
where
R: tokio::io::AsyncRead + Unpin,
W: tokio::io::AsyncWrite + Unpin,
{
/// Assemble a session from the handshake outputs.
///
/// `start_counter` is the AEAD nonce counter both directions have reached after the encrypted
/// handshake messages (so the first Data record stamps `seq == start_counter`). `peer_id` is
/// the verified peer Common Name (the server learns the client id; the client may store the
/// server name).
pub(crate) fn new(
reader: R,
writer: W,
send_aead: AeadSession,
recv_aead: AeadSession,
start_counter: u64,
peer_id: Option<String>,
) -> Self {
Self {
reader,
writer,
send_aead,
recv_aead,
send_seq: start_counter,
replay: ReplayWindow::new(start_counter),
peer_id,
}
}
/// The verified identity (Common Name) of the peer, if one was captured during the handshake.
#[must_use]
pub fn peer_id(&self) -> Option<&str> {
self.peer_id.as_deref()
}
/// Serialize, seal, and send a single application [`Frame`].
///
/// # Errors
@@ -194,6 +156,31 @@ where
Ok(())
}
/// Consume the send half, returning the underlying writer.
#[must_use]
pub fn into_inner(self) -> W {
self.writer
}
}
/// The receive half of a [`Session`]: owns the reader plus the inbound AEAD and replay window.
///
/// Obtained via [`Session::split`]. It can be moved into a dedicated reader task (see
/// [`SessionSender`]). Note: [`recv_frame`](SessionReceiver::recv_frame) is **not**
/// cancellation-safe — drive it from a single owning task rather than racing it in
/// `tokio::select!`.
pub struct SessionReceiver<R> {
reader: R,
/// AEAD this endpoint opens incoming Data with.
recv_aead: AeadSession,
/// Replay window over incoming Data sequence numbers.
replay: ReplayWindow,
}
impl<R> SessionReceiver<R>
where
R: tokio::io::AsyncRead + Unpin,
{
/// Receive, open, and decode a single application [`Frame`].
///
/// # Errors
@@ -239,10 +226,99 @@ where
Frame::decode(&plaintext)
}
/// Consume the receive half, returning the underlying reader.
#[must_use]
pub fn into_inner(self) -> R {
self.reader
}
}
/// An established, encrypted Aura session over a transport reader `R` and writer `W`.
///
/// Created by [`crate::client_handshake`] / [`crate::server_handshake`]. Use
/// [`Session::send_frame`] / [`Session::recv_frame`] for half-duplex convenience, or
/// [`Session::split`] to obtain independent [`SessionSender`] / [`SessionReceiver`] halves that can
/// be moved into separate tasks for full-duplex operation (e.g. a VPN data path with concurrent
/// read and write).
pub struct Session<R, W> {
sender: SessionSender<W>,
receiver: SessionReceiver<R>,
/// The verified identity (Common Name) of the peer, learned during the handshake.
peer_id: Option<String>,
}
impl<R, W> Session<R, W>
where
R: tokio::io::AsyncRead + Unpin,
W: tokio::io::AsyncWrite + Unpin,
{
/// Assemble a session from the handshake outputs.
///
/// `start_counter` is the AEAD nonce counter both directions have reached after the encrypted
/// handshake messages (so the first Data record stamps `seq == start_counter`). `peer_id` is
/// the verified peer Common Name (the server learns the client id; the client may store the
/// server name).
pub(crate) fn new(
reader: R,
writer: W,
send_aead: AeadSession,
recv_aead: AeadSession,
start_counter: u64,
peer_id: Option<String>,
) -> Self {
Self {
sender: SessionSender {
writer,
send_aead,
send_seq: start_counter,
},
receiver: SessionReceiver {
reader,
recv_aead,
replay: ReplayWindow::new(start_counter),
},
peer_id,
}
}
/// The verified identity (Common Name) of the peer, if one was captured during the handshake.
#[must_use]
pub fn peer_id(&self) -> Option<&str> {
self.peer_id.as_deref()
}
/// Serialize, seal, and send a single application [`Frame`] (half-duplex convenience).
///
/// # Errors
/// See [`SessionSender::send_frame`].
pub async fn send_frame(&mut self, frame: Frame) -> Result<(), ProtoError> {
self.sender.send_frame(frame).await
}
/// Receive, open, and decode a single application [`Frame`] (half-duplex convenience).
///
/// # Errors
/// See [`SessionReceiver::recv_frame`].
pub async fn recv_frame(&mut self) -> Result<Frame, ProtoError> {
self.receiver.recv_frame().await
}
/// Split into independent send and receive halves for full-duplex operation.
///
/// The two halves own disjoint state (writer + outbound AEAD vs. reader + inbound AEAD +
/// replay window), so they can be moved into separate tasks and driven concurrently. Capture
/// [`Session::peer_id`] before splitting if you still need it.
#[must_use]
pub fn split(self) -> (SessionSender<W>, SessionReceiver<R>) {
(self.sender, self.receiver)
}
/// Consume the session, returning its transport halves (for clean shutdown / reuse).
#[must_use]
pub fn into_inner(self) -> (R, W) {
(self.reader, self.writer)
let SessionSender { writer, .. } = self.sender;
let SessionReceiver { reader, .. } = self.receiver;
(reader, writer)
}
}