// Package transport implements the Aura UDP client: a reliable HS-adapter wrapping the // handshake so it can run over lossy UDP, plus the post-handshake datagram data path. // // Wire layout (mirrors aura-transport/src/udp.rs): // // 0x01 HS : [optional 16-byte knock prefix] || 0x01 || hs_seq(u16 BE) || ack_upto(u16 BE) || msg_bytes // 0x02 DATA : 0x02 || rec_len(u16 BE) || sealed_record [|| random_padding] // // The HS phase is a DTLS-flight style reliability layer: every sent datagram is retransmitted // every `hs_rto` until either acked or the overall `hs_timeout` elapses; cumulative acks prune // the retransmit queue. package transport import ( "context" "encoding/binary" "errors" "fmt" "io" "net" "sort" "sync" "time" "github.com/aura/singbox-aura/aura/frame" "github.com/aura/singbox-aura/aura/handshake" "github.com/aura/singbox-aura/aura/session" ) // Wire-layer type bytes. const ( typeHS byte = 0x01 typeDATA byte = 0x02 ) // HS header layout: type(1) || hs_seq(2 BE) || ack_upto(2 BE) || msg_bytes. const hsPrefixLen = 1 + 2 + 2 // DATA header layout: type(1) || rec_len(2 BE) || sealed_record. const dataPrefixLen = 1 + 2 // AckNone is the on-wire sentinel for "I have received nothing yet". const ackNone uint16 = 0xFFFF // Default UDP read buffer — large enough for ClientHello (1253 bytes + headers) with slack. const recvBuf = 2048 // Options exposes the same knobs as Rust's UdpOpts. Defaults intentionally match. type Options struct { // Probe resistance (optional). When KnockEnabled is true, KnockKey must be 32 bytes. KnockEnabled bool KnockKey [32]byte // Handshake retransmit timeout: every HsRTO, all unacked HS datagrams are resent. HsRTO time.Duration // Overall handshake deadline. HsTimeout time.Duration // Linger duration: after the handshake completes, the client briefly resends the final // flight to recover from a lost last message. HsLinger time.Duration } // DefaultOptions matches Rust's UdpOpts::default sans obfuscation / cover-traffic (a TODO for v1 // of the Go port). func DefaultOptions() *Options { return &Options{ HsRTO: 250 * time.Millisecond, HsTimeout: 10 * time.Second, HsLinger: 2 * time.Second, } } // Connection is an established Aura UDP connection. After Dial succeeds, the caller uses Send // / Recv to ship application packets. type Connection struct { conn *net.UDPConn sender *session.DatagramSender recvr *session.DatagramReceiver peer string mu sync.Mutex // serializes sender access (Pong replies + user sends) } // PeerID returns the verified peer identity (the server name). func (c *Connection) PeerID() string { return c.peer } // Send seals one application packet as a Frame::Data on stream 0 and ships it in one DATA // datagram. func (c *Connection) Send(payload []byte) error { c.mu.Lock() rec := c.sender.Seal(&frame.Frame{Kind: frame.FrameData, StreamID: 0, Payload: payload}) c.mu.Unlock() return c.writeDataDgram(rec) } // Recv blocks until the next application packet arrives. Ping is answered with Pong // transparently; Pong is ignored; Close surfaces as an error (terminating the connection). func (c *Connection) Recv() ([]byte, error) { buf := make([]byte, recvBuf) for { n, err := c.conn.Read(buf) if err != nil { return nil, err } dg := buf[:n] if len(dg) == 0 { continue } switch dg[0] { case typeDATA: if len(dg) < dataPrefixLen { continue } recLen := int(binary.BigEndian.Uint16(dg[1:3])) end := dataPrefixLen + recLen if len(dg) < end { continue } f, err := c.recvr.Open(dg[dataPrefixLen:end]) if err != nil { continue // replay / tampered / out-of-window: defensive drop } switch f.Kind { case frame.FrameData: return f.Payload, nil case frame.FramePing: // Answer with Pong on the same datagram path. c.mu.Lock() rec := c.sender.Seal(&frame.Frame{Kind: frame.FramePong, Seq: f.Seq}) c.mu.Unlock() if err := c.writeDataDgram(rec); err != nil { return nil, err } case frame.FramePong: continue case frame.FrameClose: return nil, fmt.Errorf("aura/transport: peer closed (code=%d): %s", f.Code, f.Reason) } case typeHS: // Late HS retransmit on the data path: ignore. continue default: continue } } } // Close releases the underlying socket. func (c *Connection) Close() error { return c.conn.Close() } func (c *Connection) writeDataDgram(rec []byte) error { if len(rec) > 0xFFFF { return fmt.Errorf("aura/transport: sealed record too large: %d", len(rec)) } dg := make([]byte, 0, dataPrefixLen+len(rec)) dg = append(dg, typeDATA) var lb [2]byte binary.BigEndian.PutUint16(lb[:], uint16(len(rec))) dg = append(dg, lb[:]...) dg = append(dg, rec...) _, err := c.conn.Write(dg) return err } // Dial connects to an Aura UDP server, performs the mutual-auth handshake over the reliable // adapter, and returns an established Connection. func Dial(ctx context.Context, addr string, hsCfg *handshake.ClientConfig, opts *Options) (*Connection, error) { if opts == nil { opts = DefaultOptions() } rAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, fmt.Errorf("resolve %s: %w", addr, err) } conn, err := net.DialUDP("udp", nil, rAddr) if err != nil { return nil, fmt.Errorf("dial udp: %w", err) } // The reliable adapter manages send/recv during the handshake; once we have a Connection // the user owns it. adapter := newHSAdapter(conn, opts) done := make(chan struct{}) adapter.start(done) defer close(done) // stop the driver once Dial returns res, err := handshake.Client(adapter, adapter, hsCfg) if err != nil { _ = conn.Close() return nil, fmt.Errorf("aura handshake: %w", err) } // After the handshake, build datagram codecs starting at PostHandshakeCounter (2): both // directions sealed exactly two encrypted handshake messages (Auth + Finished), so the AEAD // counters resume from there. sender, err := session.NewDatagramSender(res.C2S[:], session.PostHandshakeCounter) if err != nil { _ = conn.Close() return nil, err } recvr, err := session.NewDatagramReceiver(res.S2C[:], session.PostHandshakeCounter) if err != nil { _ = conn.Close() return nil, err } // Linger: briefly resend the last unacked flight so a lost final message is recovered. adapter.linger(opts.HsRTO, opts.HsLinger) return &Connection{ conn: conn, sender: sender, recvr: recvr, peer: res.PeerID, }, nil } // ============================================================================================ // Reliable HS adapter // ============================================================================================ // hsAdapter wraps a *net.UDPConn with a small DTLS-flight reliability layer. It implements // io.Reader and io.Writer so handshake.Client can drive it like a stream — the adapter parses // the 5-byte Aura frame header in its outbound buffer to know each message's total length, so // each whole frame becomes exactly one HS datagram. type hsAdapter struct { conn *net.UDPConn opts *Options mu sync.Mutex // Outbound: bytes the handshake wrote but not yet framed into an HS datagram. outPartial []byte // Outbound: hs_seq -> msg_bytes for retransmit. unacked map[uint16][]byte // Outbound: next hs_seq to stamp. nextSendSeq uint16 // Inbound: hs_seq -> received msg_bytes (reorder buffer). inBuf map[uint16][]byte // Inbound: next hs_seq we expect to deliver. nextDeliverSeq uint16 // Inbound: bytes delivered in order but not yet read by the caller. ready []byte readyPos int // Signals from the network goroutine to a parked reader. readCond *sync.Cond closed bool // Network goroutine errors (only the first sticks). netErr error } func newHSAdapter(conn *net.UDPConn, opts *Options) *hsAdapter { a := &hsAdapter{ conn: conn, opts: opts, unacked: make(map[uint16][]byte), inBuf: make(map[uint16][]byte), } a.readCond = sync.NewCond(&a.mu) return a } func (a *hsAdapter) ackUpto() uint16 { if a.nextDeliverSeq == 0 { return ackNone } return a.nextDeliverSeq - 1 } // pruneAcked drops every entry with hs_seq <= ack_upto (cumulative ack). func (a *hsAdapter) pruneAcked(ackUpto uint16) { if ackUpto == ackNone { return } for k := range a.unacked { if k <= ackUpto { delete(a.unacked, k) } } } // acceptIncoming integrates a received HS payload at seq, advancing contiguous delivery. func (a *hsAdapter) acceptIncoming(seq uint16, msg []byte) { if seq < a.nextDeliverSeq { return // already delivered (a retransmit): drop } if _, ok := a.inBuf[seq]; !ok { a.inBuf[seq] = msg } before := len(a.ready) for { m, ok := a.inBuf[a.nextDeliverSeq] if !ok { break } delete(a.inBuf, a.nextDeliverSeq) a.ready = append(a.ready, m...) a.nextDeliverSeq++ // wraps mod 2^16 } if len(a.ready) > before { a.readCond.Broadcast() } } // sendHS builds and sends one HS datagram carrying msg at seq+ack. // Called under the lock or with the values already snapshotted. func (a *hsAdapter) sendHS(seq, ack uint16, msg []byte) error { prefix := 0 if a.opts.KnockEnabled { prefix = KnockLen } dg := make([]byte, 0, prefix+hsPrefixLen+len(msg)) if a.opts.KnockEnabled { tok := KnockForMinute(a.opts.KnockKey, CurrentUnixMinute()) dg = append(dg, tok[:]...) } dg = append(dg, typeHS) var sb [2]byte binary.BigEndian.PutUint16(sb[:], seq) dg = append(dg, sb[:]...) binary.BigEndian.PutUint16(sb[:], ack) dg = append(dg, sb[:]...) dg = append(dg, msg...) _, err := a.conn.Write(dg) return err } // flushOutgoing parses message boundaries out of outPartial and emits one HS datagram per // whole frame. Holds the lock internally; safe to call from any goroutine. func (a *hsAdapter) flushOutgoing() error { for { a.mu.Lock() if len(a.outPartial) < frame.HeaderLen { a.mu.Unlock() return nil } var hdr [frame.HeaderLen]byte copy(hdr[:], a.outPartial[:frame.HeaderLen]) _, plen, err := frame.DecodeHeader(hdr) if err != nil { a.mu.Unlock() return nil // wait for more bytes } total := frame.HeaderLen + plen if len(a.outPartial) < total { a.mu.Unlock() return nil } msg := make([]byte, total) copy(msg, a.outPartial[:total]) a.outPartial = a.outPartial[total:] seq := a.nextSendSeq a.nextSendSeq++ ack := a.ackUpto() a.unacked[seq] = msg a.mu.Unlock() if err := a.sendHS(seq, ack, msg); err != nil { return err } } } // maybeBareAck emits a zero-length HS datagram so the peer can prune its retransmit queue. The // bare ack does not consume a sequence number. func (a *hsAdapter) maybeBareAck() error { a.mu.Lock() should := a.nextDeliverSeq > 0 && len(a.outPartial) == 0 seq := a.nextSendSeq ack := a.ackUpto() a.mu.Unlock() if !should { return nil } return a.sendHS(seq, ack, nil) } // retransmitUnacked re-sends every still-unacked HS datagram. Called on the RTO timer. func (a *hsAdapter) retransmitUnacked() error { a.mu.Lock() ack := a.ackUpto() // Iterate in seq order for deterministic wire behaviour. seqs := make([]uint16, 0, len(a.unacked)) for k := range a.unacked { seqs = append(seqs, k) } sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) batch := make([][2]any, 0, len(seqs)) for _, s := range seqs { batch = append(batch, [2]any{s, a.unacked[s]}) } a.mu.Unlock() for _, e := range batch { seq := e[0].(uint16) msg := e[1].([]byte) if err := a.sendHS(seq, ack, msg); err != nil { return err } } return nil } // pumpOneIncoming reads and integrates exactly one HS datagram. func (a *hsAdapter) pumpOneIncoming() error { buf := make([]byte, recvBuf) n, err := a.conn.Read(buf) if err != nil { return err } dg := buf[:n] if len(dg) == 0 || dg[0] != typeHS || len(dg) < hsPrefixLen { return nil } seq := binary.BigEndian.Uint16(dg[1:3]) ack := binary.BigEndian.Uint16(dg[3:5]) msg := append([]byte{}, dg[hsPrefixLen:]...) a.mu.Lock() a.pruneAcked(ack) if len(msg) > 0 { a.acceptIncoming(seq, msg) } a.mu.Unlock() return nil } // start launches the driver goroutine that interleaves I/O while the handshake future runs. // The driver stops when `done` is closed. func (a *hsAdapter) start(done chan struct{}) { // Reader goroutine. go func() { for { // Use a short read timeout so the goroutine can notice `done` promptly. _ = a.conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond)) if err := a.pumpOneIncoming(); err != nil { if isTimeout(err) { select { case <-done: return default: continue } } a.setErr(err) return } // After pumping an incoming datagram, flush any replies + maybe a bare ack. _ = a.flushOutgoing() _ = a.maybeBareAck() select { case <-done: return default: } } }() // RTO + timeout driver. go func() { rto := time.NewTicker(a.opts.HsRTO) defer rto.Stop() dead := time.NewTimer(a.opts.HsTimeout) defer dead.Stop() for { select { case <-done: return case <-rto.C: _ = a.flushOutgoing() _ = a.retransmitUnacked() case <-dead.C: a.setErr(fmt.Errorf("aura/transport: UDP handshake timed out after %s", a.opts.HsTimeout)) return } } }() } // linger briefly resends the last unacked flight after the handshake returns. Stops early if // nothing is unacked. func (a *hsAdapter) linger(rto, total time.Duration) { rounds := 3 per := rto if total/time.Duration(rounds) < per { per = total / time.Duration(rounds) } for i := 0; i < rounds; i++ { a.mu.Lock() empty := len(a.unacked) == 0 a.mu.Unlock() if empty { return } _ = a.retransmitUnacked() time.Sleep(per) } } func (a *hsAdapter) setErr(err error) { a.mu.Lock() if a.netErr == nil { a.netErr = err } a.closed = true a.readCond.Broadcast() a.mu.Unlock() } // Read implements io.Reader for the handshake driver: hand out already-delivered contiguous // bytes. Blocks (via Cond) until some bytes are ready or the adapter is closed/errored. func (a *hsAdapter) Read(p []byte) (int, error) { a.mu.Lock() defer a.mu.Unlock() for { if a.readyPos < len(a.ready) { n := copy(p, a.ready[a.readyPos:]) a.readyPos += n if a.readyPos == len(a.ready) { a.ready = a.ready[:0] a.readyPos = 0 } return n, nil } if a.netErr != nil { return 0, a.netErr } if a.closed { return 0, io.EOF } a.readCond.Wait() } } // Write implements io.Writer: append to outPartial and flush any newly-complete messages. func (a *hsAdapter) Write(p []byte) (int, error) { a.mu.Lock() a.outPartial = append(a.outPartial, p...) a.mu.Unlock() if err := a.flushOutgoing(); err != nil { return 0, err } return len(p), nil } // isTimeout returns true if err is a net.Error with Timeout()==true. func isTimeout(err error) bool { var ne net.Error return err != nil && errors.As(err, &ne) && ne.Timeout() }