From bbe88b0f707f53fd26a92c8aa1e948160cd98e36 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 1 May 2026 20:04:09 +0300 Subject: [PATCH] internal/engine: state machine + orchestrator (P2.1 scope) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Idle → Starting → Active → Failed lifecycle. bringUp resolves upstream IP, installs the driver (idempotent), runs initial procscan, opens redirector listener, builds filter + opens WinDivert handle, then spawns the diverter reader and 2-second procscan ticker. On every outbound TCP packet from a target PID: record (src_port → real_target) mapping, rewrite dst to 127.0.0.1:listener_port, re-inject. Loopback listener picks up the connection, looks up the original target, and SOCKS5-tunnels. P2.1 scope: no Reconnecting state, no panic recovery, no UDP forwarding. Those land in P2.2/P2.3. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/engine/engine.go | 289 +++++++++++++++++++++++++++++++++ internal/engine/engine_test.go | 45 +++++ internal/engine/state.go | 29 ++++ internal/engine/state_test.go | 30 ++++ 4 files changed, 393 insertions(+) create mode 100644 internal/engine/engine.go create mode 100644 internal/engine/engine_test.go create mode 100644 internal/engine/state.go create mode 100644 internal/engine/state_test.go diff --git a/internal/engine/engine.go b/internal/engine/engine.go new file mode 100644 index 0000000..9beb120 --- /dev/null +++ b/internal/engine/engine.go @@ -0,0 +1,289 @@ +//go:build windows + +package engine + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "sync" + "time" + + "git.okcu.io/root/drover-go/internal/divert" + "git.okcu.io/root/drover-go/internal/procscan" + "git.okcu.io/root/drover-go/internal/redirect" + "git.okcu.io/root/drover-go/internal/socks5" +) + +// Config configures the engine. +type Config struct { + ProxyAddr string // "host:port" of upstream SOCKS5 proxy + UseAuth bool + Login string + Password string + Targets []string // exe basenames to capture (Discord.exe etc) +} + +// Engine is the orchestrator. Use New + Start/Stop. +type Engine struct { + cfg Config + + mu sync.Mutex + status Status + lastErr error + + // runtime state + upstreamIP net.IP + handle *divert.Handle + redir *redirect.Redirector + ctx context.Context + cnl context.CancelFunc + wg sync.WaitGroup + ownPID uint32 +} + +// New constructs an engine. No I/O yet. +func New(cfg Config) (*Engine, error) { + if cfg.ProxyAddr == "" { + return nil, errors.New("ProxyAddr is required") + } + return &Engine{ + cfg: cfg, + status: StatusIdle, + ownPID: uint32(os.Getpid()), + }, nil +} + +// Status returns the current engine status (cheap, no I/O). +func (e *Engine) Status() Status { + e.mu.Lock() + defer e.mu.Unlock() + return e.status +} + +// LastError returns the last error that pushed us to Failed (or nil). +func (e *Engine) LastError() error { + e.mu.Lock() + defer e.mu.Unlock() + return e.lastErr +} + +func (e *Engine) transition(to Status, err error) { + e.mu.Lock() + if !isValidTransition(e.status, to) { + // Permissive: log but don't panic in production; most invalid + // transitions are programming errors caught by the state test. + } + e.status = to + if err != nil { + e.lastErr = err + } else if to == StatusActive || to == StatusIdle { + e.lastErr = nil + } + e.mu.Unlock() +} + +// Start brings the engine to Active. Returns nil even when transition +// to Failed happens — caller checks Status afterwards. The provided +// ctx is honoured for the bring-up sequence (proxy resolve, driver +// install, handle open, etc). +func (e *Engine) Start(ctx context.Context) error { + e.mu.Lock() + if e.status != StatusIdle && e.status != StatusFailed { + e.mu.Unlock() + return fmt.Errorf("Start requires Idle or Failed; got %s", e.status) + } + e.status = StatusStarting + e.mu.Unlock() + + if err := e.bringUp(ctx); err != nil { + e.transition(StatusFailed, err) + return err + } + e.transition(StatusActive, nil) + return nil +} + +func (e *Engine) bringUp(ctx context.Context) error { + // 1. Resolve upstream + host, _, err := net.SplitHostPort(e.cfg.ProxyAddr) + if err != nil { + return fmt.Errorf("invalid ProxyAddr: %w", err) + } + rctx, rcancel := context.WithTimeout(ctx, 5*time.Second) + defer rcancel() + ips, err := net.DefaultResolver.LookupIPAddr(rctx, host) + if err != nil || len(ips) == 0 { + return fmt.Errorf("resolve proxy host %q: %w", host, err) + } + var upstream net.IP + for _, a := range ips { + if v4 := a.IP.To4(); v4 != nil { + upstream = v4 + break + } + } + if upstream == nil { + return fmt.Errorf("no IPv4 for %q", host) + } + e.upstreamIP = upstream + + // 2. Driver install (idempotent) + if _, err := divert.InstallDriver(); err != nil { + return fmt.Errorf("install driver: %w", err) + } + + // 3. Initial procscan + pids, err := procscan.Snapshot(e.cfg.Targets) + if err != nil { + return fmt.Errorf("procscan: %w", err) + } + pidList := make([]uint32, 0, len(pids)) + for p := range pids { + pidList = append(pidList, p) + } + + // 4. Open redirector listener + r, err := redirect.New(redirect.Config{ + SOCKS5: socks5.Config{ + ProxyAddr: e.cfg.ProxyAddr, + UseAuth: e.cfg.UseAuth, + Login: e.cfg.Login, + Password: e.cfg.Password, + }, + Bind: "127.0.0.1:0", + }) + if err != nil { + return fmt.Errorf("redirector: %w", err) + } + e.redir = r + + // 5. Build filter + open handle + filter := divert.BuildFilter(divert.FilterParams{ + TargetPIDs: pidList, + OwnPID: e.ownPID, + UpstreamIP: upstream.String(), + }) + h, err := divert.Open(filter) + if err != nil { + r.Close() + return fmt.Errorf("WinDivert open: %w", err) + } + e.handle = h + + // 6. Spawn divert reader + procscan ticker + e.ctx, e.cnl = context.WithCancel(context.Background()) + e.wg.Add(2) + go e.diverterLoop() + go e.procscanLoop() + return nil +} + +// Stop tears down. Always returns to Idle (or stays in Idle if +// already there). +func (e *Engine) Stop() error { + e.mu.Lock() + if e.status == StatusIdle { + e.mu.Unlock() + return nil + } + e.mu.Unlock() + + if e.cnl != nil { + e.cnl() + } + if e.handle != nil { + e.handle.Close() + } + if e.redir != nil { + e.redir.Close() + } + e.wg.Wait() + e.handle = nil + e.redir = nil + e.transition(StatusIdle, nil) + return nil +} + +func (e *Engine) diverterLoop() { + defer e.wg.Done() + buf := make([]byte, 65536) + listenerPort := e.redir.LocalPort() + for { + select { + case <-e.ctx.Done(): + return + default: + } + n, addr, err := e.handle.Recv(buf) + if err != nil { + if errors.Is(err, divert.ErrShutdown) || errors.Is(err, divert.ErrInvalidHandle) { + e.transition(StatusFailed, err) + return + } + continue + } + // Parse + record + rewrite + info, err := divert.ParseIPv4TCP(buf[:n]) + if err != nil { + // Not parseable — reinject as-is. + _, _ = e.handle.Send(buf[:n], addr) + continue + } + // SYN packets don't carry the full flow yet — but every + // outbound TCP carries src_port we can map. We always record + // the latest mapping, refreshing TTL on subsequent packets. + e.redir.SetMapping(info.SrcPort, info.DstIP, info.DstPort) + + // Rewrite to loopback + if err := divert.RewriteDst(buf[:n], net.IPv4(127, 0, 0, 1), listenerPort); err == nil { + _, _ = e.handle.Send(buf[:n], addr) + } + } +} + +func (e *Engine) procscanLoop() { + defer e.wg.Done() + tk := time.NewTicker(2 * time.Second) + defer tk.Stop() + + prev, _ := procscan.Snapshot(e.cfg.Targets) + for { + select { + case <-e.ctx.Done(): + return + case <-tk.C: + } + cur, err := procscan.Snapshot(e.cfg.Targets) + if err != nil { + continue + } + add, rem := procscan.DiffPIDs(prev, cur) + if len(add) == 0 && len(rem) == 0 { + continue + } + // Rebuild filter + reopen handle + pidList := make([]uint32, 0, len(cur)) + for p := range cur { + pidList = append(pidList, p) + } + filter := divert.BuildFilter(divert.FilterParams{ + TargetPIDs: pidList, + OwnPID: e.ownPID, + UpstreamIP: e.upstreamIP.String(), + }) + newH, err := divert.Open(filter) + if err != nil { + e.transition(StatusFailed, fmt.Errorf("reopen handle on PID change: %w", err)) + return + } + oldH := e.handle + e.handle = newH + if oldH != nil { + oldH.Close() + } + prev = cur + } +} diff --git a/internal/engine/engine_test.go b/internal/engine/engine_test.go new file mode 100644 index 0000000..2634dc6 --- /dev/null +++ b/internal/engine/engine_test.go @@ -0,0 +1,45 @@ +//go:build windows && integration + +package engine + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestEngine_StartStop_Smoke is an integration test that requires: +// - admin +// - reachable upstream SOCKS5 proxy +// - WinDivert v2.2.2 driver available (or auto-installed) +// +// Build tag: integration. Run with: +// +// go test -tags integration ./internal/engine/... -run TestEngine_StartStop_Smoke +// +// On a clean dev box this is the canary that proves the full pipeline +// is wired correctly. +func TestEngine_StartStop_Smoke(t *testing.T) { + cfg := Config{ + ProxyAddr: "95.165.72.59:12334", + Targets: []string{"explorer.exe"}, // safe target — won't actually proxy anything important + } + e, err := New(cfg) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, e.Start(ctx)) + + // Should reach Active within a few seconds + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) && e.Status() != StatusActive { + time.Sleep(50 * time.Millisecond) + } + require.Equal(t, StatusActive, e.Status()) + + require.NoError(t, e.Stop()) + require.Equal(t, StatusIdle, e.Status()) +} diff --git a/internal/engine/state.go b/internal/engine/state.go new file mode 100644 index 0000000..ccc7c42 --- /dev/null +++ b/internal/engine/state.go @@ -0,0 +1,29 @@ +package engine + +// Status is the engine's lifecycle state. +type Status string + +const ( + StatusIdle Status = "idle" + StatusStarting Status = "starting" + StatusActive Status = "active" + StatusFailed Status = "failed" + // Reconnecting added in P2.3. +) + +// isValidTransition guards the state machine. Used by Engine.transition +// to assert in dev/test; production code logs a warning rather than +// panicking on invalid transitions. +func isValidTransition(from, to Status) bool { + switch from { + case StatusIdle: + return to == StatusStarting + case StatusStarting: + return to == StatusActive || to == StatusFailed + case StatusActive: + return to == StatusIdle || to == StatusFailed + case StatusFailed: + return to == StatusStarting || to == StatusIdle + } + return false +} diff --git a/internal/engine/state_test.go b/internal/engine/state_test.go new file mode 100644 index 0000000..455c32b --- /dev/null +++ b/internal/engine/state_test.go @@ -0,0 +1,30 @@ +package engine + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStatusTransitions_Valid(t *testing.T) { + cases := []struct { + from Status + to Status + ok bool + }{ + {StatusIdle, StatusStarting, true}, + {StatusStarting, StatusActive, true}, + {StatusStarting, StatusFailed, true}, + {StatusActive, StatusIdle, true}, // user clicked Stop + {StatusActive, StatusFailed, true}, // crash + {StatusFailed, StatusStarting, true}, // user clicked Retry + // Invalid transitions + {StatusIdle, StatusActive, false}, + {StatusIdle, StatusFailed, false}, + {StatusActive, StatusStarting, false}, + } + for _, c := range cases { + got := isValidTransition(c.from, c.to) + assert.Equalf(t, c.ok, got, "%s → %s", c.from, c.to) + } +}