internal/engine: state machine + orchestrator (P2.1 scope)
Build / test (push) Failing after 30s
Build / build-windows (push) Has been skipped

Idle → Starting → Active → Failed lifecycle. bringUp resolves
upstream IP, installs the driver (idempotent), runs initial procscan,
opens redirector listener, builds filter + opens WinDivert handle,
then spawns the diverter reader and 2-second procscan ticker.

On every outbound TCP packet from a target PID: record (src_port →
real_target) mapping, rewrite dst to 127.0.0.1:listener_port,
re-inject. Loopback listener picks up the connection, looks up the
original target, and SOCKS5-tunnels.

P2.1 scope: no Reconnecting state, no panic recovery, no UDP
forwarding. Those land in P2.2/P2.3.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-01 20:04:09 +03:00
parent dd402d4fc4
commit bbe88b0f70
4 changed files with 393 additions and 0 deletions
+289
View File
@@ -0,0 +1,289 @@
//go:build windows
package engine
import (
"context"
"errors"
"fmt"
"net"
"os"
"sync"
"time"
"git.okcu.io/root/drover-go/internal/divert"
"git.okcu.io/root/drover-go/internal/procscan"
"git.okcu.io/root/drover-go/internal/redirect"
"git.okcu.io/root/drover-go/internal/socks5"
)
// Config configures the engine.
type Config struct {
ProxyAddr string // "host:port" of upstream SOCKS5 proxy
UseAuth bool
Login string
Password string
Targets []string // exe basenames to capture (Discord.exe etc)
}
// Engine is the orchestrator. Use New + Start/Stop.
type Engine struct {
cfg Config
mu sync.Mutex
status Status
lastErr error
// runtime state
upstreamIP net.IP
handle *divert.Handle
redir *redirect.Redirector
ctx context.Context
cnl context.CancelFunc
wg sync.WaitGroup
ownPID uint32
}
// New constructs an engine. No I/O yet.
func New(cfg Config) (*Engine, error) {
if cfg.ProxyAddr == "" {
return nil, errors.New("ProxyAddr is required")
}
return &Engine{
cfg: cfg,
status: StatusIdle,
ownPID: uint32(os.Getpid()),
}, nil
}
// Status returns the current engine status (cheap, no I/O).
func (e *Engine) Status() Status {
e.mu.Lock()
defer e.mu.Unlock()
return e.status
}
// LastError returns the last error that pushed us to Failed (or nil).
func (e *Engine) LastError() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.lastErr
}
func (e *Engine) transition(to Status, err error) {
e.mu.Lock()
if !isValidTransition(e.status, to) {
// Permissive: log but don't panic in production; most invalid
// transitions are programming errors caught by the state test.
}
e.status = to
if err != nil {
e.lastErr = err
} else if to == StatusActive || to == StatusIdle {
e.lastErr = nil
}
e.mu.Unlock()
}
// Start brings the engine to Active. Returns nil even when transition
// to Failed happens — caller checks Status afterwards. The provided
// ctx is honoured for the bring-up sequence (proxy resolve, driver
// install, handle open, etc).
func (e *Engine) Start(ctx context.Context) error {
e.mu.Lock()
if e.status != StatusIdle && e.status != StatusFailed {
e.mu.Unlock()
return fmt.Errorf("Start requires Idle or Failed; got %s", e.status)
}
e.status = StatusStarting
e.mu.Unlock()
if err := e.bringUp(ctx); err != nil {
e.transition(StatusFailed, err)
return err
}
e.transition(StatusActive, nil)
return nil
}
func (e *Engine) bringUp(ctx context.Context) error {
// 1. Resolve upstream
host, _, err := net.SplitHostPort(e.cfg.ProxyAddr)
if err != nil {
return fmt.Errorf("invalid ProxyAddr: %w", err)
}
rctx, rcancel := context.WithTimeout(ctx, 5*time.Second)
defer rcancel()
ips, err := net.DefaultResolver.LookupIPAddr(rctx, host)
if err != nil || len(ips) == 0 {
return fmt.Errorf("resolve proxy host %q: %w", host, err)
}
var upstream net.IP
for _, a := range ips {
if v4 := a.IP.To4(); v4 != nil {
upstream = v4
break
}
}
if upstream == nil {
return fmt.Errorf("no IPv4 for %q", host)
}
e.upstreamIP = upstream
// 2. Driver install (idempotent)
if _, err := divert.InstallDriver(); err != nil {
return fmt.Errorf("install driver: %w", err)
}
// 3. Initial procscan
pids, err := procscan.Snapshot(e.cfg.Targets)
if err != nil {
return fmt.Errorf("procscan: %w", err)
}
pidList := make([]uint32, 0, len(pids))
for p := range pids {
pidList = append(pidList, p)
}
// 4. Open redirector listener
r, err := redirect.New(redirect.Config{
SOCKS5: socks5.Config{
ProxyAddr: e.cfg.ProxyAddr,
UseAuth: e.cfg.UseAuth,
Login: e.cfg.Login,
Password: e.cfg.Password,
},
Bind: "127.0.0.1:0",
})
if err != nil {
return fmt.Errorf("redirector: %w", err)
}
e.redir = r
// 5. Build filter + open handle
filter := divert.BuildFilter(divert.FilterParams{
TargetPIDs: pidList,
OwnPID: e.ownPID,
UpstreamIP: upstream.String(),
})
h, err := divert.Open(filter)
if err != nil {
r.Close()
return fmt.Errorf("WinDivert open: %w", err)
}
e.handle = h
// 6. Spawn divert reader + procscan ticker
e.ctx, e.cnl = context.WithCancel(context.Background())
e.wg.Add(2)
go e.diverterLoop()
go e.procscanLoop()
return nil
}
// Stop tears down. Always returns to Idle (or stays in Idle if
// already there).
func (e *Engine) Stop() error {
e.mu.Lock()
if e.status == StatusIdle {
e.mu.Unlock()
return nil
}
e.mu.Unlock()
if e.cnl != nil {
e.cnl()
}
if e.handle != nil {
e.handle.Close()
}
if e.redir != nil {
e.redir.Close()
}
e.wg.Wait()
e.handle = nil
e.redir = nil
e.transition(StatusIdle, nil)
return nil
}
func (e *Engine) diverterLoop() {
defer e.wg.Done()
buf := make([]byte, 65536)
listenerPort := e.redir.LocalPort()
for {
select {
case <-e.ctx.Done():
return
default:
}
n, addr, err := e.handle.Recv(buf)
if err != nil {
if errors.Is(err, divert.ErrShutdown) || errors.Is(err, divert.ErrInvalidHandle) {
e.transition(StatusFailed, err)
return
}
continue
}
// Parse + record + rewrite
info, err := divert.ParseIPv4TCP(buf[:n])
if err != nil {
// Not parseable — reinject as-is.
_, _ = e.handle.Send(buf[:n], addr)
continue
}
// SYN packets don't carry the full flow yet — but every
// outbound TCP carries src_port we can map. We always record
// the latest mapping, refreshing TTL on subsequent packets.
e.redir.SetMapping(info.SrcPort, info.DstIP, info.DstPort)
// Rewrite to loopback
if err := divert.RewriteDst(buf[:n], net.IPv4(127, 0, 0, 1), listenerPort); err == nil {
_, _ = e.handle.Send(buf[:n], addr)
}
}
}
func (e *Engine) procscanLoop() {
defer e.wg.Done()
tk := time.NewTicker(2 * time.Second)
defer tk.Stop()
prev, _ := procscan.Snapshot(e.cfg.Targets)
for {
select {
case <-e.ctx.Done():
return
case <-tk.C:
}
cur, err := procscan.Snapshot(e.cfg.Targets)
if err != nil {
continue
}
add, rem := procscan.DiffPIDs(prev, cur)
if len(add) == 0 && len(rem) == 0 {
continue
}
// Rebuild filter + reopen handle
pidList := make([]uint32, 0, len(cur))
for p := range cur {
pidList = append(pidList, p)
}
filter := divert.BuildFilter(divert.FilterParams{
TargetPIDs: pidList,
OwnPID: e.ownPID,
UpstreamIP: e.upstreamIP.String(),
})
newH, err := divert.Open(filter)
if err != nil {
e.transition(StatusFailed, fmt.Errorf("reopen handle on PID change: %w", err))
return
}
oldH := e.handle
e.handle = newH
if oldH != nil {
oldH.Close()
}
prev = cur
}
}