//! `test_data_exchange_1000pkts` — after the handshake, exchange 1000 Data frames in each //! direction and assert payload integrity and ordering. mod common; use aura_proto::{client_handshake, server_handshake, Frame}; use bytes::Bytes; use tokio::io::split; const N: u32 = 1000; /// Build the deterministic payload for frame `i` from `who`. fn payload_for(who: &str, i: u32) -> Bytes { Bytes::from(format!( "{who}-packet-{i}-{}", "x".repeat((i % 37) as usize) )) } #[tokio::test] async fn test_data_exchange_1000pkts() { let pki = common::mint_pki("vpn.aura.example", "client-alpha"); let client_cfg = pki.client_config(); let server_cfg = pki.server_config(); let (client_end, server_end) = tokio::io::duplex(64 * 1024); let (c_read, c_write) = split(client_end); let (s_read, s_write) = split(server_end); let client = tokio::spawn(async move { let mut sess = client_handshake(c_read, c_write, &client_cfg) .await .expect("client handshake"); // Interleave send + recv in lockstep to avoid filling the duplex buffer. for i in 0..N { sess.send_frame(Frame::Data { stream_id: 1, payload: payload_for("client", i), }) .await .expect("client send"); match sess.recv_frame().await.expect("client recv") { Frame::Data { stream_id, payload } => { assert_eq!(stream_id, 2, "wrong stream id at i={i}"); assert_eq!( payload, payload_for("server", i), "payload mismatch at i={i}" ); } other => panic!("client expected Data, got {other:?}"), } } }); let server = tokio::spawn(async move { let mut sess = server_handshake(s_read, s_write, &server_cfg) .await .expect("server handshake"); for i in 0..N { // Receive the client's i-th packet first, then reply, mirroring the client's lockstep. match sess.recv_frame().await.expect("server recv") { Frame::Data { stream_id, payload } => { assert_eq!(stream_id, 1, "wrong stream id at i={i}"); assert_eq!( payload, payload_for("client", i), "payload mismatch at i={i}" ); } other => panic!("server expected Data, got {other:?}"), } sess.send_frame(Frame::Data { stream_id: 2, payload: payload_for("server", i), }) .await .expect("server send"); } }); let (c, s) = tokio::join!(client, server); c.expect("client task"); s.expect("server task"); } #[tokio::test] async fn ping_pong_and_close_frames_roundtrip() { let pki = common::mint_pki("vpn.aura.example", "c1"); let client_cfg = pki.client_config(); let server_cfg = pki.server_config(); let (client_end, server_end) = tokio::io::duplex(64 * 1024); let (c_read, c_write) = split(client_end); let (s_read, s_write) = split(server_end); let client = tokio::spawn(async move { let mut sess = client_handshake(c_read, c_write, &client_cfg) .await .unwrap(); sess.send_frame(Frame::Ping { seq: 7 }).await.unwrap(); match sess.recv_frame().await.unwrap() { Frame::Pong { seq } => assert_eq!(seq, 7), other => panic!("expected Pong, got {other:?}"), } sess.send_frame(Frame::Close { code: 0, reason: "bye".into(), }) .await .unwrap(); }); let server = tokio::spawn(async move { let mut sess = server_handshake(s_read, s_write, &server_cfg) .await .unwrap(); match sess.recv_frame().await.unwrap() { Frame::Ping { seq } => sess.send_frame(Frame::Pong { seq }).await.unwrap(), other => panic!("expected Ping, got {other:?}"), } match sess.recv_frame().await.unwrap() { Frame::Close { code, reason } => { assert_eq!(code, 0); assert_eq!(reason, "bye"); } other => panic!("expected Close, got {other:?}"), } }); let (c, s) = tokio::join!(client, server); c.unwrap(); s.unwrap(); }