Files
drover-go/internal/engine/engine.go
T
root 4074e68715 experimental/windivert: P2.1+P2.2 with WinDivert NETWORK+SOCKET layers
WIP snapshot before pivot to sing-box+TUN. Reached:
- TCP redirect via streamdump pattern (swap+Outbound=0+reinject)
- SOCKET layer for SYN-stage flow detection (avoids FLOW Establish-too-late race)
- Lazy PID→name resolution (catches Update.exe inside procscan tick)
- UDP forward via SOCKS5 UDP ASSOCIATE relay + manual reinject
- Result: chat works, voice times out (Discord IP discovery / RTC handshake fails)

Reason for pivot: WinDivert NAT-reinject pattern has subtle layer-3
semantics issues that DLL-injection / TUN-based proxies sidestep
entirely. Going with embedded sing-box + wintun as the engine —
proven path for Discord voice through SOCKS5.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 22:27:54 +03:00

668 lines
20 KiB
Go

//go:build windows
package engine
import (
"context"
"errors"
"fmt"
"log"
"net"
"os"
"strings"
"sync"
"time"
"git.okcu.io/root/drover-go/internal/divert"
"git.okcu.io/root/drover-go/internal/procscan"
"git.okcu.io/root/drover-go/internal/redirect"
"git.okcu.io/root/drover-go/internal/socks5"
)
// Config configures the engine.
type Config struct {
ProxyAddr string // "host:port" of upstream SOCKS5 proxy
UseAuth bool
Login string
Password string
Targets []string // exe basenames to capture (Discord.exe etc)
}
// Engine is the orchestrator. Use New + Start/Stop.
type Engine struct {
cfg Config
mu sync.Mutex
status Status
lastErr error
// runtime state
upstreamIP net.IP
localIP net.IP // primary outbound LAN IP — listener binds here so reinjected NAT'd packets reach it (kernel drops src=LAN/dst=127.0.0.1 as spoofed)
handleMu sync.RWMutex // guards handle + flowH swap during procscan rebuild
handle *divert.Handle // NETWORK layer: capture/rewrite/reinject packets
flowH *divert.Handle // FLOW layer: capture-ALL events (filter "true"); we filter by PID in-process
redir *redirect.Redirector
udp *redirect.UDPProxy // SOCKS5 UDP relay manager — handles Discord voice etc.
ctx context.Context
cnl context.CancelFunc
wg sync.WaitGroup
ownPID uint32
// pidMu guards targetPIDs. Updated by procscanLoop, read by flowLoop
// for every event. Read frequency: ~50 events/sec average; write:
// every 2s. RWMutex contention negligible.
pidMu sync.RWMutex
targetPIDs map[uint32]struct{}
// flowSet tracks 5-tuples currently belonging to target processes.
// Populated by flowLoop on EventFlowEstablished, removed on
// EventFlowDeleted. Read by diverterLoop on every captured packet
// to decide whether to redirect or pass through.
flowMu sync.RWMutex
flowSet map[flowKey]struct{}
}
// flowKey identifies a flow by its 5-tuple. Drover uses local→remote
// (i.e. always the outbound direction) — the flow handle reports
// LocalAddr/Port and RemoteAddr/Port which match an outbound packet's
// SrcAddr/Port and DstAddr/Port.
// flowKey identifies a tracked flow. We deliberately omit SrcIP from
// the key: when Discord (or any client) binds a UDP/TCP socket to
// INADDR_ANY (0.0.0.0), the SOCKET layer reports src=0.0.0.0, but the
// actual outbound packet has src=<local_LAN_IP> (kernel fills the
// interface address). Including src in the key would cause those
// flows to miss the lookup. Source port + destination + proto is a
// sufficient discriminator on a single host.
type flowKey struct {
dst [4]byte
sport uint16
dport uint16
proto uint8 // 6=TCP, 17=UDP
}
// New constructs an engine. No I/O yet.
func New(cfg Config) (*Engine, error) {
if cfg.ProxyAddr == "" {
return nil, errors.New("ProxyAddr is required")
}
return &Engine{
cfg: cfg,
status: StatusIdle,
ownPID: uint32(os.Getpid()),
flowSet: map[flowKey]struct{}{},
targetPIDs: map[uint32]struct{}{},
}, nil
}
// Status returns the current engine status (cheap, no I/O).
func (e *Engine) Status() Status {
e.mu.Lock()
defer e.mu.Unlock()
return e.status
}
// LastError returns the last error that pushed us to Failed (or nil).
func (e *Engine) LastError() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.lastErr
}
func (e *Engine) transition(to Status, err error) {
e.mu.Lock()
if !isValidTransition(e.status, to) {
// Permissive: log but don't panic in production; most invalid
// transitions are programming errors caught by the state test.
}
e.status = to
if err != nil {
e.lastErr = err
} else if to == StatusActive || to == StatusIdle {
e.lastErr = nil
}
e.mu.Unlock()
}
// Start brings the engine to Active. Returns nil even when transition
// to Failed happens — caller checks Status afterwards. The provided
// ctx is honoured for the bring-up sequence (proxy resolve, driver
// install, handle open, etc).
func (e *Engine) Start(ctx context.Context) error {
e.mu.Lock()
if e.status != StatusIdle && e.status != StatusFailed {
e.mu.Unlock()
return fmt.Errorf("Start requires Idle or Failed; got %s", e.status)
}
e.status = StatusStarting
e.mu.Unlock()
if err := e.bringUp(ctx); err != nil {
e.transition(StatusFailed, err)
return err
}
e.transition(StatusActive, nil)
return nil
}
func (e *Engine) bringUp(ctx context.Context) error {
log.Printf("engine: bringUp start cfg.ProxyAddr=%q targets=%v", e.cfg.ProxyAddr, e.cfg.Targets)
// 1. Resolve upstream
host, _, err := net.SplitHostPort(e.cfg.ProxyAddr)
if err != nil {
log.Printf("engine: SplitHostPort failed: %v", err)
return fmt.Errorf("invalid ProxyAddr: %w", err)
}
rctx, rcancel := context.WithTimeout(ctx, 5*time.Second)
defer rcancel()
ips, err := net.DefaultResolver.LookupIPAddr(rctx, host)
if err != nil || len(ips) == 0 {
log.Printf("engine: LookupIPAddr(%q) failed: %v (ips=%v)", host, err, ips)
return fmt.Errorf("resolve proxy host %q: %w", host, err)
}
var upstream net.IP
for _, a := range ips {
if v4 := a.IP.To4(); v4 != nil {
upstream = v4
break
}
}
if upstream == nil {
log.Printf("engine: no IPv4 for %q (got %v)", host, ips)
return fmt.Errorf("no IPv4 for %q", host)
}
e.upstreamIP = upstream
log.Printf("engine: upstream resolved %s → %s", host, upstream)
// 1b. Detect outbound LAN IP — listener binds here. Trick:
// open a UDP "connect" to any external IP; kernel picks the
// outbound interface and we read LocalAddr off the conn.
if udpConn, dErr := net.Dial("udp", "8.8.8.8:53"); dErr == nil {
e.localIP = udpConn.LocalAddr().(*net.UDPAddr).IP.To4()
udpConn.Close()
}
if e.localIP == nil {
log.Printf("engine: could not detect local LAN IP — falling back to 127.0.0.1 (may not work)")
e.localIP = net.IPv4(127, 0, 0, 1)
}
log.Printf("engine: local LAN IP = %s", e.localIP)
// 2. Driver install (idempotent)
paths, err := divert.InstallDriver()
if err != nil {
log.Printf("engine: InstallDriver failed: %v", err)
return fmt.Errorf("install driver: %w", err)
}
log.Printf("engine: driver installed sys=%s dll=%s", paths.SysPath, paths.DllPath)
// 3. Initial procscan
pids, err := procscan.Snapshot(e.cfg.Targets)
if err != nil {
log.Printf("engine: procscan.Snapshot failed: %v", err)
return fmt.Errorf("procscan: %w", err)
}
pidList := make([]uint32, 0, len(pids))
for p := range pids {
pidList = append(pidList, p)
}
log.Printf("engine: initial procscan found %d target pids: %v", len(pidList), pids)
// 4. Open redirector listener on 0.0.0.0 so it accepts on any
// interface (including the LAN IP we'll target with the swap-and-
// reinject NAT pattern). After the streamdump swap the packet has
// dst=LAN_IP:listener_port — kernel delivers via inbound path of
// the LAN interface; listener accepts it as a regular TCP conn.
r, err := redirect.New(redirect.Config{
SOCKS5: socks5.Config{
ProxyAddr: e.cfg.ProxyAddr,
UseAuth: e.cfg.UseAuth,
Login: e.cfg.Login,
Password: e.cfg.Password,
},
Bind: "0.0.0.0:0",
})
if err != nil {
log.Printf("engine: redirect.New failed: %v", err)
return fmt.Errorf("redirector: %w", err)
}
e.redir = r
log.Printf("engine: redirector listening on %s", r.LocalAddr())
// 5. Build filters
// SOCKET handle uses "true" — capture ALL socket events. We
// filter by PID in-process. SOCKET layer fires Connect events
// SYNCHRONOUSLY with the connect() syscall, BEFORE the SYN packet
// leaves the box — which gives socketLoop time to populate the
// redirector mapping before NETWORK-layer SYN arrives.
//
// (FLOW Established events fire after the 3-way handshake, which
// is too late for our SYN-redirect plan — by then the conn is
// already pointing at the real target.)
netFilter := divert.BuildNetworkFilter(divert.FilterParams{
TargetPIDs: pidList,
OwnPID: e.ownPID,
UpstreamIP: upstream.String(),
LocalIP: e.localIP.String(),
})
log.Printf("engine: socket filter: \"true\" (capture-all, PID-filter in-process)")
log.Printf("engine: network filter: %s", netFilter)
// Seed targetPIDs from initial procscan
e.pidMu.Lock()
for p := range pids {
e.targetPIDs[p] = struct{}{}
}
e.pidMu.Unlock()
// 6. Open SOCKET handle FIRST with broad filter so we never miss
// a new connection between procscan ticks. socketLoop discards
// events from non-target PIDs in-process.
flowH, err := divert.OpenSocket("true")
if err != nil {
log.Printf("engine: divert.OpenSocket failed: %v", err)
r.Close()
return fmt.Errorf("WinDivert socket open: %w", err)
}
e.flowH = flowH
log.Printf("engine: WinDivert SOCKET handle opened (filter=\"true\")")
// 7. Open NETWORK handle for actual packet capture/redirect.
netH, err := divert.Open(netFilter)
if err != nil {
log.Printf("engine: divert.Open(network) failed: %v", err)
flowH.Close()
r.Close()
return fmt.Errorf("WinDivert network open: %w", err)
}
e.handle = netH
log.Printf("engine: WinDivert NETWORK handle opened")
// 7b. UDP proxy. The SOCKS5 UDP ASSOCIATE control conn is opened
// lazily on the first UDP packet from a target, so this New call
// is non-blocking — no upstream I/O happens here.
udpProxy, err := redirect.NewUDP(redirect.UDPConfig{
SOCKS5: socks5.Config{
ProxyAddr: e.cfg.ProxyAddr,
UseAuth: e.cfg.UseAuth,
Login: e.cfg.Login,
Password: e.cfg.Password,
},
LocalIP: e.localIP,
Injector: divertHandleInjector{h: netH},
LogPrefix: "engine udp: ",
})
if err != nil {
log.Printf("engine: redirect.NewUDP failed: %v", err)
netH.Close()
flowH.Close()
r.Close()
return fmt.Errorf("udp proxy: %w", err)
}
e.udp = udpProxy
log.Printf("engine: UDP proxy ready (lazy SOCKS5 ASSOCIATE)")
// 8. Spawn socket tracker + divert reader + procscan ticker
e.ctx, e.cnl = context.WithCancel(context.Background())
e.wg.Add(3)
go e.socketLoop()
go e.diverterLoop()
go e.procscanLoop()
log.Printf("engine: bringUp complete, transitioning to Active")
return nil
}
// divertHandleInjector adapts *divert.Handle to redirect.UDPInjector.
// We expose Send through the SendInjectInbound helper which sets the
// right WinDivert flags for fabricated inbound packets.
type divertHandleInjector struct {
h *divert.Handle
}
func (d divertHandleInjector) Send(buf []byte, _ redirect.UDPInjectAddr) (int, error) {
// We only ever inject UDP via this path (TCP path uses the
// captured addr directly in diverterLoop).
return d.h.SendInjectInbound(buf, true /* isUDP */)
}
// Stop tears down. Always returns to Idle (or stays in Idle if
// already there).
func (e *Engine) Stop() error {
e.mu.Lock()
if e.status == StatusIdle {
e.mu.Unlock()
return nil
}
e.mu.Unlock()
if e.cnl != nil {
e.cnl()
}
if e.handle != nil {
e.handle.Close()
}
if e.flowH != nil {
e.flowH.Close()
}
if e.udp != nil {
e.udp.Close()
}
if e.redir != nil {
e.redir.Close()
}
e.wg.Wait()
e.handle = nil
e.flowH = nil
e.redir = nil
e.udp = nil
e.transition(StatusIdle, nil)
return nil
}
// socketLoop reads socket-layer events (Connect/Close) from the
// SOCKET handle and maintains e.flowSet + the redirector mapping.
//
// SOCKET Connect fires synchronously with the connect() syscall on
// the originating thread, BEFORE the SYN packet is dispatched. This
// is the critical window: by populating flowSet+mapping in this
// handler, the diverterLoop's NETWORK capture of the SYN finds the
// target on first lookup and redirects correctly.
func (e *Engine) socketLoop() {
defer e.wg.Done()
log.Printf("engine: socketLoop started")
iter := 0
for {
select {
case <-e.ctx.Done():
log.Printf("engine: socketLoop ctx done after %d iterations", iter)
return
default:
}
iter++
e.handleMu.RLock()
h := e.flowH
e.handleMu.RUnlock()
if h == nil {
time.Sleep(50 * time.Millisecond)
continue
}
ev, err := h.RecvSocket()
if err != nil {
if errors.Is(err, divert.ErrShutdown) || errors.Is(err, divert.ErrInvalidHandle) {
log.Printf("engine: socketLoop terminal error after %d iterations: %v", iter, err)
return
}
log.Printf("engine: socketLoop transient error (iter %d): %v", iter, err)
time.Sleep(100 * time.Millisecond)
continue
}
// In-process PID filter. Fast path: PID is in the set procscan
// fed us. Slow path: PID isn't yet known (Update.exe spawn →
// connect → exit routinely fits inside the 2-second procscan
// tick), so resolve PID → exe name on demand and admit it if
// the name matches our Targets list. This is what makes
// "Checking for updates" finish in ~5 s instead of 30+.
e.pidMu.RLock()
_, isTarget := e.targetPIDs[ev.ProcessID]
e.pidMu.RUnlock()
if !isTarget {
if name, err := procscan.ResolvePID(ev.ProcessID); err == nil {
lname := strings.ToLower(name)
for _, t := range e.cfg.Targets {
if strings.EqualFold(t, lname) || strings.EqualFold(t, name) {
isTarget = true
e.pidMu.Lock()
e.targetPIDs[ev.ProcessID] = struct{}{}
e.pidMu.Unlock()
log.Printf("engine: lazy-admit pid=%d name=%s (matched target)", ev.ProcessID, name)
break
}
}
}
if !isTarget {
continue
}
}
switch ev.Kind {
case divert.SocketKindConnect:
// Connect fires before SYN. Populate redirector mapping +
// flowSet so when SYN arrives at NETWORK layer the
// diverterLoop knows to redirect.
key := flowKey{
dst: ev.DstAddr,
sport: ev.SrcPort,
dport: ev.DstPort,
proto: ev.Protocol,
}
e.flowMu.Lock()
e.flowSet[key] = struct{}{}
setSize := len(e.flowSet)
e.flowMu.Unlock()
e.redir.SetMapping(ev.SrcPort, net.IPv4(ev.DstAddr[0], ev.DstAddr[1], ev.DstAddr[2], ev.DstAddr[3]), ev.DstPort)
log.Printf("engine: socket connect pid=%d proto=%d %v:%d → %v:%d (set size=%d)",
ev.ProcessID, ev.Protocol, ev.SrcAddr, ev.SrcPort, ev.DstAddr, ev.DstPort, setSize)
case divert.SocketKindClose:
key := flowKey{
dst: ev.DstAddr,
sport: ev.SrcPort,
dport: ev.DstPort,
proto: ev.Protocol,
}
e.flowMu.Lock()
delete(e.flowSet, key)
e.flowMu.Unlock()
}
}
}
func (e *Engine) diverterLoop() {
defer e.wg.Done()
log.Printf("engine: diverterLoop started")
buf := make([]byte, 65536)
listenerPort := e.redir.LocalPort()
var rxCount, redirCount int64
statusTk := time.NewTicker(5 * time.Second)
defer statusTk.Stop()
go func() {
for range statusTk.C {
select {
case <-e.ctx.Done():
return
default:
}
var udpFwd, udpFwdBytes, udpRecv, udpInj uint64
if e.udp != nil {
udpFwd, udpFwdBytes, udpRecv, udpInj = e.udp.Stats()
}
log.Printf("engine: diverter stats rx=%d tcpRedir=%d flowSet=%d | UDP fwd=%d/%dB recv=%d injected=%d",
rxCount, redirCount, len(e.flowSet), udpFwd, udpFwdBytes, udpRecv, udpInj)
}
}()
for {
select {
case <-e.ctx.Done():
return
default:
}
e.handleMu.RLock()
h := e.handle
e.handleMu.RUnlock()
if h == nil {
time.Sleep(50 * time.Millisecond)
continue
}
n, addr, err := h.Recv(buf)
if err != nil {
if errors.Is(err, divert.ErrShutdown) || errors.Is(err, divert.ErrInvalidHandle) {
log.Printf("engine: diverterLoop terminal error after %d rx: %v", rxCount, err)
e.transition(StatusFailed, err)
return
}
log.Printf("engine: diverterLoop transient Recv error: %v", err)
continue
}
rxCount++
// === UDP fast path ===
// Quick header sniff: if proto=17 (UDP), try the UDP-flow
// branch. Target UDP flows are forwarded through the SOCKS5
// UDP relay (consumed — NOT reinjected); non-target UDP is
// passed through unmodified.
if n >= 10 && buf[0]>>4 == 4 && buf[9] == 17 {
udpInfo, uerr := divert.ParseIPv4UDP(buf[:n])
if uerr == nil {
var ukey flowKey
copy(ukey.dst[:], udpInfo.DstIP.To4())
ukey.sport = udpInfo.SrcPort
ukey.dport = udpInfo.DstPort
ukey.proto = 17
e.flowMu.RLock()
_, isUDPTarget := e.flowSet[ukey]
e.flowMu.RUnlock()
if isUDPTarget && e.udp != nil {
// Strip IPv4 + UDP headers; the rest is application
// payload that we hand to the SOCKS5 UDP relay.
payload := buf[udpInfo.IHL+8 : n]
if ferr := e.udp.Forward(udpInfo.SrcIP, udpInfo.SrcPort, udpInfo.DstIP, udpInfo.DstPort, payload); ferr != nil {
log.Printf("engine: udp forward error %v:%d → %v:%d: %v",
udpInfo.SrcIP, udpInfo.SrcPort, udpInfo.DstIP, udpInfo.DstPort, ferr)
// Drop on error — UDP loss is acceptable.
} else {
redirCount++
}
// Consumed: do NOT reinject — relay reader will
// fabricate the inbound reply.
continue
}
// Non-target UDP: pass through unmodified.
_, _ = h.Send(buf[:n], addr)
continue
}
// Malformed UDP — fall through to TCP parse path (which
// will also fail and reinject).
}
// Parse + decide
info, err := divert.ParseIPv4TCP(buf[:n])
if err != nil {
// Not IPv4-TCP and not handled by UDP path above. Reinject
// as-is so non-target traffic continues normally.
_, _ = h.Send(buf[:n], addr)
continue
}
localV4 := e.localIP.To4()
srcV4 := info.SrcIP.To4()
// === RETURN path === : packet emitted by our listener back to
// the client. SrcIP=local LAN, SrcPort=listener_port. We swap
// IPs and rewrite SrcPort to the original target port so the
// client (Discord) sees a response that matches its connect()
// pair (src=real_target:real_port, dst=local_IP:client_eph).
if info.SrcPort == listenerPort && srcV4 != nil && srcV4.Equal(localV4) {
realIP, realPort, ok := e.redir.GetMapping(info.DstPort)
if !ok {
// No mapping — should be rare (TTL evicted?). Drop by
// reinjecting as-is; client will retransmit.
_, _ = h.Send(buf[:n], addr)
continue
}
_ = realIP // streamdump only needs the original target PORT;
// the IP is already the right one after the swap below
// (we swap dst/src — original dst (=local) becomes src,
// original src (=client_local) becomes dst). The original
// remote IP is not on this packet — it's listener→client,
// not listener→remote. So srcIP after swap = info.DstIP =
// real Discord IP because... actually no — our SrcIP IS
// local, our DstIP IS Discord. After swap our SrcIP =
// Discord, DstIP = local. That's exactly what we want.
if err := divert.SwapAndSetSrcPort(buf[:n], realPort); err == nil {
addr.Flags &^= 0x02 // clear Outbound (deliver as inbound)
addr.Flags |= 0x60 // signal IP+TCP checksums valid
_, _ = h.Send(buf[:n], addr)
redirCount++
}
continue
}
// === FORWARD path === : packet from a target process to a
// remote. Apply streamdump swap so the kernel delivers it to
// our listener via the inbound path.
var key flowKey
copy(key.dst[:], info.DstIP.To4())
key.sport = info.SrcPort
key.dport = info.DstPort
key.proto = 6
e.flowMu.RLock()
_, isTarget := e.flowSet[key]
e.flowMu.RUnlock()
if !isTarget {
_, _ = h.Send(buf[:n], addr)
continue
}
// Target flow: refresh redirector mapping and apply the
// canonical streamdump swap (swap src↔dst, dst.port=listener,
// addr.Outbound=0, mark checksums valid).
e.redir.SetMapping(info.SrcPort, info.DstIP, info.DstPort)
if err := divert.SwapAndSetDstPort(buf[:n], listenerPort); err == nil {
addr.Flags &^= 0x02 // clear Outbound (deliver as inbound)
addr.Flags |= 0x60 // signal IP+TCP checksums valid
_, _ = h.Send(buf[:n], addr)
redirCount++
}
}
}
func (e *Engine) procscanLoop() {
defer e.wg.Done()
tk := time.NewTicker(2 * time.Second)
defer tk.Stop()
prev, _ := procscan.Snapshot(e.cfg.Targets)
for {
select {
case <-e.ctx.Done():
return
case <-tk.C:
}
cur, err := procscan.Snapshot(e.cfg.Targets)
if err != nil {
continue
}
add, rem := procscan.DiffPIDs(prev, cur)
if len(add) == 0 && len(rem) == 0 {
continue
}
log.Printf("engine: procscan delta added=%v removed=%v", add, rem)
// Update targetPIDs map. flowLoop reads it on every event;
// no handle reopen needed (FLOW filter is "true").
e.pidMu.Lock()
for _, p := range add {
e.targetPIDs[p] = struct{}{}
}
for _, p := range rem {
delete(e.targetPIDs, p)
}
e.pidMu.Unlock()
// Drop tracked flows for the removed PIDs. We don't actually
// know which flowKey belongs to which PID (we lose that info
// after Established → flowSet keyed by 5-tuple, not PID), so
// for safety just clear the set when a target PID disappears
// — flow events from the new PIDs will repopulate.
if len(rem) > 0 {
e.flowMu.Lock()
e.flowSet = map[flowKey]struct{}{}
e.flowMu.Unlock()
}
prev = cur
}
}