diff --git a/internal/redirect/tcp.go b/internal/redirect/tcp.go new file mode 100644 index 0000000..73a2416 --- /dev/null +++ b/internal/redirect/tcp.go @@ -0,0 +1,177 @@ +package redirect + +import ( + "context" + "errors" + "fmt" + "io" + "net" + "sync" + "time" + + "git.okcu.io/root/drover-go/internal/socks5" +) + +// Config configures the TCP redirector. +type Config struct { + SOCKS5 socks5.Config + Bind string // "127.0.0.1:0" — listener bind addr +} + +type mapping struct { + dstIP net.IP + dstPort uint16 + added time.Time +} + +// Redirector is the loopback listener that catches NAT-rewritten SYNs +// from divert and tunnels them through SOCKS5. +type Redirector struct { + cfg Config + ln net.Listener + mu sync.RWMutex + flows map[uint16]mapping // src_port → mapping + wg sync.WaitGroup + ctx context.Context + cnl context.CancelFunc +} + +// New starts a Redirector. It binds the listener but does not yet +// have any mappings; SetMapping is called by the divert layer when +// it sees an outbound SYN from a target PID. +func New(cfg Config) (*Redirector, error) { + bind := cfg.Bind + if bind == "" { + bind = "127.0.0.1:0" + } + ln, err := net.Listen("tcp", bind) + if err != nil { + return nil, fmt.Errorf("listen %s: %w", bind, err) + } + ctx, cnl := context.WithCancel(context.Background()) + r := &Redirector{ + cfg: cfg, + ln: ln, + flows: map[uint16]mapping{}, + ctx: ctx, + cnl: cnl, + } + r.wg.Add(1) + go r.acceptLoop() + r.wg.Add(1) + go r.sweepLoop() + return r, nil +} + +func (r *Redirector) LocalAddr() string { return r.ln.Addr().String() } + +func (r *Redirector) LocalPort() uint16 { + return uint16(r.ln.Addr().(*net.TCPAddr).Port) +} + +// SetMapping records that future TCP connections originating from +// src_port should be tunneled to dstIP:dstPort. Called by the divert +// layer at SYN time. +func (r *Redirector) SetMapping(srcPort uint16, dstIP net.IP, dstPort uint16) { + r.mu.Lock() + r.flows[srcPort] = mapping{dstIP: dstIP, dstPort: dstPort, added: time.Now()} + r.mu.Unlock() +} + +// Close stops accepting and tears down active flows. +func (r *Redirector) Close() error { + r.cnl() + err := r.ln.Close() + r.wg.Wait() + return err +} + +func (r *Redirector) acceptLoop() { + defer r.wg.Done() + for { + c, err := r.ln.Accept() + if err != nil { + return + } + r.wg.Add(1) + go r.handle(c) + } +} + +func (r *Redirector) handle(c net.Conn) { + defer r.wg.Done() + defer c.Close() + + srcPort := uint16(c.RemoteAddr().(*net.TCPAddr).Port) + r.mu.RLock() + m, ok := r.flows[srcPort] + r.mu.RUnlock() + if !ok { + return // unknown flow; drop quietly + } + + ctx, cancel := context.WithTimeout(r.ctx, 10*time.Second) + defer cancel() + + host := m.dstIP.String() + upstream, err := socks5.Dial(ctx, r.cfg.SOCKS5, host, m.dstPort) + if err != nil { + return + } + defer upstream.Close() + + pump(c, upstream) +} + +func pump(a, b net.Conn) { + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + // G1: upstream → client (read from b, write to a). + _, _ = io.Copy(a, b) + if cw, ok := a.(closeWriter); ok { + cw.CloseWrite() + } + // We are exiting — force G2's read of a to unblock so the + // pump tears down even if the peer half never closes. + _ = a.SetReadDeadline(time.Now()) + }() + go func() { + defer wg.Done() + // G2: client → upstream (read from a, write to b). + _, _ = io.Copy(b, a) + if cw, ok := b.(closeWriter); ok { + cw.CloseWrite() + } + _ = b.SetReadDeadline(time.Now()) + }() + wg.Wait() +} + +type closeWriter interface{ CloseWrite() error } + +// sweepLoop removes mappings older than 30 minutes (T-6 in spec). +func (r *Redirector) sweepLoop() { + defer r.wg.Done() + tk := time.NewTicker(time.Minute) + defer tk.Stop() + for { + select { + case <-r.ctx.Done(): + return + case <-tk.C: + cutoff := time.Now().Add(-30 * time.Minute) + r.mu.Lock() + for k, m := range r.flows { + if m.added.Before(cutoff) { + delete(r.flows, k) + } + } + r.mu.Unlock() + } + } +} + +// Sentinel for callers. +var ErrNotMapped = errors.New("redirector: source port has no mapping") diff --git a/internal/redirect/tcp_test.go b/internal/redirect/tcp_test.go new file mode 100644 index 0000000..2ff86ad --- /dev/null +++ b/internal/redirect/tcp_test.go @@ -0,0 +1,139 @@ +package redirect + +import ( + "context" + "io" + "net" + "sync" + "testing" + "time" + + "git.okcu.io/root/drover-go/internal/socks5" + "github.com/stretchr/testify/require" +) + +// startEchoListener spins up a TCP server that echoes whatever it reads. +func startEchoListener(t *testing.T) string { + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { ln.Close() }) + go func() { + for { + c, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + io.Copy(c, c) + }(c) + } + }() + return ln.Addr().String() +} + +// startFakeSOCKS5 returns the addr of a no-auth SOCKS5 server that +// CONNECT-tunnels to the requested host:port. (Borrowed pattern from +// internal/socks5/client_test.go.) +func startFakeSOCKS5(t *testing.T) string { + ln, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + t.Cleanup(func() { ln.Close() }) + go func() { + for { + c, err := ln.Accept() + if err != nil { + return + } + go func(c net.Conn) { + defer c.Close() + buf := make([]byte, 256) + // Greet + io.ReadFull(c, buf[:2]) + nm := int(buf[1]) + io.ReadFull(c, buf[:nm]) + c.Write([]byte{0x05, 0x00}) + // CONNECT + io.ReadFull(c, buf[:4]) + atyp := buf[3] + var host string + switch atyp { + case 1: + io.ReadFull(c, buf[:4]) + host = net.IPv4(buf[0], buf[1], buf[2], buf[3]).String() + case 3: + io.ReadFull(c, buf[:1]) + hl := int(buf[0]) + io.ReadFull(c, buf[:hl]) + host = string(buf[:hl]) + } + io.ReadFull(c, buf[:2]) + port := int(buf[0])<<8 | int(buf[1]) + c.Write([]byte{0x05, 0x00, 0x00, 0x01, 0, 0, 0, 0, 0, 0}) + up, err := net.Dial("tcp", net.JoinHostPort(host, sportItoa(port))) + if err != nil { + return + } + defer up.Close() + var wg sync.WaitGroup + wg.Add(2) + go func() { defer wg.Done(); io.Copy(up, c) }() + go func() { defer wg.Done(); io.Copy(c, up) }() + wg.Wait() + }(c) + } + }() + return ln.Addr().String() +} + +func sportItoa(n int) string { + if n == 0 { + return "0" + } + out := []byte{} + for n > 0 { + out = append([]byte{byte('0' + n%10)}, out...) + n /= 10 + } + return string(out) +} + +func TestRedirector_PipesEcho(t *testing.T) { + echoAddr := startEchoListener(t) + echoHost, echoPortStr, _ := net.SplitHostPort(echoAddr) + echoPort := parseU16(echoPortStr) + socksAddr := startFakeSOCKS5(t) + + r, err := New(Config{ + SOCKS5: socks5.Config{ProxyAddr: socksAddr}, + Bind: "127.0.0.1:0", + }) + require.NoError(t, err) + t.Cleanup(func() { r.Close() }) + + // Manually map: pretend a packet from src_port=12345 was destined to echo. + r.SetMapping(12345, net.ParseIP(echoHost), echoPort) + + // Dial the redirector listener using src_port=12345 so it looks + // up the mapping correctly. + d := net.Dialer{LocalAddr: &net.TCPAddr{Port: 12345}} + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + conn, err := d.DialContext(ctx, "tcp", r.LocalAddr()) + require.NoError(t, err) + defer conn.Close() + + conn.Write([]byte("ping")) + conn.SetReadDeadline(time.Now().Add(time.Second)) + buf := make([]byte, 4) + io.ReadFull(conn, buf) + require.Equal(t, "ping", string(buf)) +} + +func parseU16(s string) uint16 { + var n int + for _, c := range s { + n = n*10 + int(c-'0') + } + return uint16(n) +}