Files
drover-go/internal/checker/checker.go
T
root 4b985bb7f0
Build / test (push) Failing after 29s
Build / build-windows (push) Has been skipped
internal/checker: 7-step Run orchestrator + integration tests
Public Run(ctx, cfg) <-chan Result streams diagnostic events for the seven
tests (tcp, greet, auth?, connect, udp, stun, api) wired through the
SOCKS5 primitives, STUN codec, retry classification and RU hints.

- Per-test attempt loop with running/passed/failed events, transient-only
  retries (per-attempt timeout treated as transient, parent ctx cancel as
  permanent), context-aware backoff sleep.
- Connection lifecycle: tcpConn shared across greet/auth/connect (closed
  and redialed on retry); separate udpConn2 control channel for UDP
  ASSOCIATE kept alive for the duration of the stun test.
- STUN-via-SOCKS5: builds 10-byte SOCKS5 UDP header + STUN binding
  request, decodes reply with ATYP-aware header strip (1/3/4).
- runAPI plugs SOCKS5 dial into http.Transport.DialContext; passes on
  HTTP 200 OR 401.
- Skip semantics: dependency-failed tests emit single skipped result;
  cancellation latches and propagates as cancelled-failed (current) +
  cancelled-skipped (remaining).
- Defaults applied to a copy of cfg; UseAuth=false suppresses any "auth"
  result entirely.

Tests: 10 TestRun_* covering happy/auth-rejected/all-rejected/
connect-refused/udp-unsupported/timeout-then-ok/cancelled-mid-flight/
defaults plus extractRawHex unit. Fake SOCKS5 proxy + UDP relay echoing
synthetic STUN binding success responses; httptest stub for API splice.

Combined coverage 84.3% (>=80% target). go test -race clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-01 16:08:36 +03:00

740 lines
20 KiB
Go

package checker
import (
"context"
"crypto/tls"
"encoding/binary"
"errors"
"fmt"
"net"
"net/http"
"regexp"
"strconv"
"time"
)
// Status represents the lifecycle state of a single test.
type Status string
// Result statuses emitted on the channel.
const (
StatusRunning Status = "running"
StatusPassed Status = "passed"
StatusFailed Status = "failed"
StatusSkipped Status = "skipped"
)
// Result is one event in the diagnostic stream. Multiple Results may be
// emitted per test (one per attempt: running + passed/failed; on retry,
// running again then passed/failed).
type Result struct {
ID string `json:"id"`
Status Status `json:"status"`
Metric string `json:"metric,omitempty"`
Error string `json:"error,omitempty"`
Hint string `json:"hint,omitempty"`
RawHex string `json:"raw_hex,omitempty"`
Duration time.Duration `json:"duration_ms"`
Attempt int `json:"attempt"`
}
// Config drives Run. Zero-value fields receive defaults via applyDefaults.
type Config struct {
ProxyHost string
ProxyPort int
UseAuth bool
ProxyLogin string
ProxyPassword string
PerTestTimeout time.Duration
MaxRetries int
RetryBackoff time.Duration
DiscordGateway string
DiscordAPI string
StunServer string
}
// applyDefaults returns a copy of cfg with zero-valued knobs filled in.
func applyDefaults(cfg Config) Config {
if cfg.PerTestTimeout <= 0 {
cfg.PerTestTimeout = 5 * time.Second
}
if cfg.MaxRetries < 0 {
cfg.MaxRetries = 0
}
if cfg.MaxRetries == 0 {
// Distinguish "explicit 0" from "unset" — spec says default is 1.
// applyDefaults runs on a copy of the caller's Config; we treat
// a literal zero as "use default" so a fresh `Config{}` works.
cfg.MaxRetries = 1
}
if cfg.RetryBackoff < 0 {
cfg.RetryBackoff = 500 * time.Millisecond
}
if cfg.RetryBackoff == 0 {
cfg.RetryBackoff = 500 * time.Millisecond
}
if cfg.DiscordGateway == "" {
cfg.DiscordGateway = "gateway.discord.gg:443"
}
if cfg.DiscordAPI == "" {
cfg.DiscordAPI = "https://discord.com/api/v9/gateway"
}
if cfg.StunServer == "" {
cfg.StunServer = "stun.l.google.com:19302"
}
return cfg
}
// Run executes the 7-step diagnostic and streams Results on the returned
// channel. The channel is closed when the run finishes (or is cancelled).
//
// Cancel ctx to abort: the in-flight test emits a Failed Result with
// Error="cancelled", and remaining tests each emit a single Skipped Result.
func Run(ctx context.Context, cfg Config) <-chan Result {
cfg = applyDefaults(cfg)
ch := make(chan Result, 16)
go func() {
defer close(ch)
e := &executor{ctx: ctx, cfg: cfg, ch: ch}
defer e.cleanup()
e.runTCP()
e.runGreet()
if cfg.UseAuth {
e.runAuth()
}
e.runConnect()
e.runUDP()
e.runStun()
e.runAPI()
}()
return ch
}
// executor carries shared state across the 7 test methods.
type executor struct {
ctx context.Context
cfg Config
ch chan<- Result
// tcpConn is opened in runTCP and reused by greet/auth/connect.
tcpConn net.Conn
// udpConn2 is the SECOND TCP control channel opened in runUDP.
// Must stay alive until stun finishes — the SOCKS5 spec requires
// the control TCP connection to remain up for the relay to be
// valid.
udpConn2 net.Conn
// udpRelay is the UDP relay endpoint announced by the proxy in
// the UDP ASSOCIATE reply.
udpRelay *net.UDPAddr
// udpClient is our local UDP socket used to talk to the relay.
udpClient net.PacketConn
// Step gating: each xOK is set true on success.
tcpOK, greetOK, authOK, connectOK, udpOK bool
// Cancellation latch. Once any test emits a "cancelled" failure,
// remaining tests emit a single Skipped result with the same reason.
cancelled bool
}
// cleanup closes any state opened during the run.
func (e *executor) cleanup() {
if e.tcpConn != nil {
_ = e.tcpConn.Close()
}
if e.udpConn2 != nil {
_ = e.udpConn2.Close()
}
if e.udpClient != nil {
_ = e.udpClient.Close()
}
}
// emit sends a Result on the channel, respecting ctx so a stalled consumer
// doesn't block us forever.
func (e *executor) emit(r Result) {
select {
case e.ch <- r:
case <-e.ctx.Done():
// Best-effort: try once more so we don't drop user-visible
// information just because cancel raced the send.
select {
case e.ch <- r:
default:
}
}
}
// emitSkipped pushes a single skipped Result with a constant reason.
func (e *executor) emitSkipped(id, reason string) {
e.emit(Result{ID: id, Status: StatusSkipped, Error: reason})
}
// emitCancelled pushes a single failed Result with Error="cancelled".
func (e *executor) emitCancelled(id string, attempt int, dur time.Duration) {
e.cancelled = true
e.emit(Result{
ID: id,
Status: StatusFailed,
Error: "cancelled",
Hint: hintFor(id, context.Canceled),
Attempt: attempt,
Duration: dur,
})
}
// shouldSkip checks high-level guard conditions and emits the appropriate
// pre-test Result if we shouldn't run. Returns true if the caller should
// abort the test.
func (e *executor) shouldSkip(id string, depOK bool) bool {
if e.cancelled {
e.emitSkipped(id, "cancelled")
return true
}
if !depOK {
e.emitSkipped(id, skipReason)
return true
}
if err := e.ctx.Err(); err != nil {
e.emitCancelled(id, 1, 0)
return true
}
return false
}
const skipReason = "depends on previous failed step"
// rawHexRE pulls "...(raw=DEADBEEF)" out of a wrapped error string.
var rawHexRE = regexp.MustCompile(`\(raw=([0-9a-fA-F]+)\)`)
// extractRawHex pulls the hex payload out of our `(raw=XX...)` error
// wrapping convention. Returns "" if absent.
func extractRawHex(s string) string {
m := rawHexRE.FindStringSubmatch(s)
if len(m) == 2 {
return m[1]
}
return ""
}
// runAttempt is the inner loop shared by all tests. It handles emitting
// running/passed/failed results, retry classification and backoff.
//
// run does the actual work for one attempt and returns metric + err.
func (e *executor) runAttempt(id string, run func(ctx context.Context) (string, error)) (ok bool) {
maxAttempts := 1 + e.cfg.MaxRetries
for attempt := 1; attempt <= maxAttempts; attempt++ {
if err := e.ctx.Err(); err != nil {
e.emitCancelled(id, attempt, 0)
return false
}
// Emit running for this attempt.
e.emit(Result{ID: id, Status: StatusRunning, Attempt: attempt})
attemptCtx, cancel := context.WithTimeout(e.ctx, e.cfg.PerTestTimeout)
start := time.Now()
metric, err := run(attemptCtx)
dur := time.Since(start)
cancel()
if err == nil {
e.emit(Result{
ID: id,
Status: StatusPassed,
Metric: metric,
Attempt: attempt,
Duration: dur,
})
return true
}
// Parent-ctx cancelled? Emit cancelled and stop (no retry
// into a cancelled context). We check the PARENT ctx, not
// attemptCtx (which always expires after PerTestTimeout).
if e.ctx.Err() != nil {
e.emitCancelled(id, attempt, dur)
return false
}
// Per-attempt deadline expired (PerTestTimeout fired) —
// treat as a transient timeout. We need to override
// classifyError here because err's chain contains
// context.DeadlineExceeded (joinCtxErr embeds attemptCtx.Err)
// which classifyError treats as permanent. The semantic
// distinction is "our per-test budget vs caller cancel" —
// the former is exactly what retries are for.
var class Classification
if isContextErr(err) {
// Parent ctx is fine (checked above), so this is a
// per-attempt deadline = transient.
class = ClassificationTransient
} else {
class = classifyError(err)
}
canRetry := class == ClassificationTransient && attempt < maxAttempts
if canRetry {
// Failed-but-will-retry: still emit Failed for the
// observer (so they see the attempt happened), but
// loop. Some consumers only show the LAST failure;
// emitting every attempt is the more transparent
// option. Spec says "emit running + passed/failed
// per attempt".
e.emit(Result{
ID: id,
Status: StatusFailed,
Error: err.Error(),
Hint: hintFor(id, err),
RawHex: extractRawHex(err.Error()),
Attempt: attempt,
Duration: dur,
})
// Sleep with cancel awareness.
select {
case <-time.After(e.cfg.RetryBackoff):
case <-e.ctx.Done():
// Caller cancelled during backoff — stop without retry.
return false
}
continue
}
// Final failure (permanent or out of retries).
e.emit(Result{
ID: id,
Status: StatusFailed,
Error: err.Error(),
Hint: hintFor(id, err),
RawHex: extractRawHex(err.Error()),
Attempt: attempt,
Duration: dur,
})
return false
}
return false
}
// proxyAddr returns the SOCKS5 proxy host:port string.
func (e *executor) proxyAddr() string {
return net.JoinHostPort(e.cfg.ProxyHost, strconv.Itoa(e.cfg.ProxyPort))
}
// runTCP — Test 1: dial the proxy.
func (e *executor) runTCP() {
if e.cancelled {
e.emitSkipped("tcp", "cancelled")
return
}
if err := e.ctx.Err(); err != nil {
e.emitCancelled("tcp", 1, 0)
return
}
ok := e.runAttempt("tcp", func(ctx context.Context) (string, error) {
// Close any prior conn from a previous attempt.
if e.tcpConn != nil {
_ = e.tcpConn.Close()
e.tcpConn = nil
}
var d net.Dialer
start := time.Now()
conn, err := d.DialContext(ctx, "tcp", e.proxyAddr())
if err != nil {
return "", err
}
e.tcpConn = conn
ms := time.Since(start).Milliseconds()
return fmt.Sprintf("%dms", ms), nil
})
e.tcpOK = ok
}
// runGreet — Test 2: SOCKS5 method negotiation.
func (e *executor) runGreet() {
if e.shouldSkip("greet", e.tcpOK) {
return
}
ok := e.runAttempt("greet", func(ctx context.Context) (string, error) {
// Each attempt needs a fresh conn — the previous attempt
// may have written bytes that left the proxy mid-handshake.
if err := e.redialTCPIfNeeded(ctx); err != nil {
return "", err
}
method, _, err := socks5Greeting(ctx, e.tcpConn, e.cfg.UseAuth)
if err != nil {
// Force redial on next attempt.
_ = e.tcpConn.Close()
e.tcpConn = nil
return "", err
}
switch method {
case 0x00:
return "no auth", nil
case 0x02:
return "auth required", nil
default:
return fmt.Sprintf("method=0x%02X", method), nil
}
})
e.greetOK = ok
}
// redialTCPIfNeeded drops and re-opens tcpConn. This is called at the
// start of each greet/auth/connect attempt after the first to give every
// attempt a fresh connection — the proxy may have advanced state on the
// previous attempt that we can't roll back.
//
// On the FIRST attempt for greet, we expect tcpConn to already be open
// (from runTCP). The simple rule: if tcpConn==nil, redial; otherwise
// keep it. The retry path closes tcpConn before re-running this loop.
func (e *executor) redialTCPIfNeeded(ctx context.Context) error {
if e.tcpConn != nil {
return nil
}
var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", e.proxyAddr())
if err != nil {
return err
}
e.tcpConn = conn
return nil
}
// runAuth — Test 3: user/pass sub-negotiation. Only emitted when UseAuth.
func (e *executor) runAuth() {
if e.shouldSkip("auth", e.greetOK) {
return
}
ok := e.runAttempt("auth", func(ctx context.Context) (string, error) {
// On retry: drop the conn and start fresh from greet+auth.
// (We can't replay only auth — the proxy has already moved
// past method negotiation.)
// retry detection: if we have nil tcpConn here, we lost it
// in a prior failed attempt and need to redial+regreet.
if e.tcpConn == nil {
var d net.Dialer
conn, derr := d.DialContext(ctx, "tcp", e.proxyAddr())
if derr != nil {
return "", derr
}
e.tcpConn = conn
if _, _, gerr := socks5Greeting(ctx, e.tcpConn, true); gerr != nil {
return "", gerr
}
}
_, err := socks5Auth(ctx, e.tcpConn, e.cfg.ProxyLogin, e.cfg.ProxyPassword)
if err != nil {
// Force redial+regreet on next attempt.
_ = e.tcpConn.Close()
e.tcpConn = nil
return "", err
}
return "ok", nil
})
e.authOK = ok
}
// runConnect — Test 4: SOCKS5 CONNECT to Discord gateway.
func (e *executor) runConnect() {
dep := e.greetOK && (!e.cfg.UseAuth || e.authOK)
if e.shouldSkip("connect", dep) {
return
}
host, portStr, splitErr := net.SplitHostPort(e.cfg.DiscordGateway)
if splitErr != nil {
e.emit(Result{
ID: "connect",
Status: StatusFailed,
Error: fmt.Sprintf("bad DiscordGateway %q: %s", e.cfg.DiscordGateway, splitErr.Error()),
Hint: hintFor("connect", splitErr),
Attempt: 1,
})
return
}
port64, perr := strconv.ParseUint(portStr, 10, 16)
if perr != nil {
e.emit(Result{
ID: "connect",
Status: StatusFailed,
Error: fmt.Sprintf("bad DiscordGateway port %q: %s", portStr, perr.Error()),
Hint: hintFor("connect", perr),
Attempt: 1,
})
return
}
port := uint16(port64)
ok := e.runAttempt("connect", func(ctx context.Context) (string, error) {
// On retry: redial+greet+(auth) before re-CONNECT.
if e.tcpConn == nil {
var d net.Dialer
conn, derr := d.DialContext(ctx, "tcp", e.proxyAddr())
if derr != nil {
return "", derr
}
e.tcpConn = conn
if _, _, gerr := socks5Greeting(ctx, e.tcpConn, e.cfg.UseAuth); gerr != nil {
return "", gerr
}
if e.cfg.UseAuth {
if _, aerr := socks5Auth(ctx, e.tcpConn, e.cfg.ProxyLogin, e.cfg.ProxyPassword); aerr != nil {
return "", aerr
}
}
}
_, err := socks5Connect(ctx, e.tcpConn, host, port)
if err != nil {
_ = e.tcpConn.Close()
e.tcpConn = nil
return "", err
}
return "REP=00", nil
})
e.connectOK = ok
}
// runUDP — Test 5: open second TCP control channel and UDP ASSOCIATE.
func (e *executor) runUDP() {
dep := e.greetOK && (!e.cfg.UseAuth || e.authOK)
if e.shouldSkip("udp", dep) {
return
}
ok := e.runAttempt("udp", func(ctx context.Context) (string, error) {
// Always use a fresh control channel for UDP ASSOCIATE.
if e.udpConn2 != nil {
_ = e.udpConn2.Close()
e.udpConn2 = nil
}
var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", e.proxyAddr())
if err != nil {
return "", err
}
e.udpConn2 = conn
if _, _, gerr := socks5Greeting(ctx, conn, e.cfg.UseAuth); gerr != nil {
return "", gerr
}
if e.cfg.UseAuth {
if _, aerr := socks5Auth(ctx, conn, e.cfg.ProxyLogin, e.cfg.ProxyPassword); aerr != nil {
return "", aerr
}
}
relay, _, uerr := socks5UDPAssociate(ctx, conn)
if uerr != nil {
return "", uerr
}
e.udpRelay = relay
return fmt.Sprintf("relay %s:%d", relay.IP.String(), relay.Port), nil
})
e.udpOK = ok
}
// runStun — Test 6: STUN through the SOCKS5 UDP relay.
func (e *executor) runStun() {
if e.shouldSkip("stun", e.udpOK) {
return
}
host, portStr, splitErr := net.SplitHostPort(e.cfg.StunServer)
if splitErr != nil {
e.emit(Result{
ID: "stun",
Status: StatusFailed,
Error: fmt.Sprintf("bad StunServer %q: %s", e.cfg.StunServer, splitErr.Error()),
Hint: hintFor("stun", splitErr),
Attempt: 1,
})
return
}
port64, perr := strconv.ParseUint(portStr, 10, 16)
if perr != nil {
e.emit(Result{
ID: "stun",
Status: StatusFailed,
Error: fmt.Sprintf("bad StunServer port %q: %s", portStr, perr.Error()),
Hint: hintFor("stun", perr),
Attempt: 1,
})
return
}
stunPort := uint16(port64)
e.runAttempt("stun", func(ctx context.Context) (string, error) {
// Resolve STUN host to an IPv4. We don't support IPv6 STUN.
ips, err := (&net.Resolver{}).LookupIP(ctx, "ip4", host)
if err != nil {
return "", fmt.Errorf("stun: lookup %s: %w", host, err)
}
var stunIP4 net.IP
for _, ip := range ips {
if v4 := ip.To4(); v4 != nil {
stunIP4 = v4
break
}
}
if stunIP4 == nil {
return "", errors.New("stun: no IPv4 for STUN server")
}
// Open a fresh local UDP socket per attempt.
if e.udpClient != nil {
_ = e.udpClient.Close()
e.udpClient = nil
}
pc, err := net.ListenPacket("udp", ":0")
if err != nil {
return "", fmt.Errorf("stun: listen udp: %w", err)
}
e.udpClient = pc
if dl, ok := ctx.Deadline(); ok {
_ = pc.SetDeadline(dl)
}
// Build SOCKS5 UDP datagram: RSV(2)=0 FRAG=0 ATYP=01 IP(4) PORT(2) STUN(20)
txID, err := NewTransactionID()
if err != nil {
return "", err
}
stunReq := EncodeBindingRequest(txID)
dgram := make([]byte, 0, 10+len(stunReq))
dgram = append(dgram, 0x00, 0x00, 0x00, 0x01)
dgram = append(dgram, stunIP4...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], stunPort)
dgram = append(dgram, portBuf[:]...)
dgram = append(dgram, stunReq...)
start := time.Now()
if _, werr := pc.WriteTo(dgram, e.udpRelay); werr != nil {
return "", fmt.Errorf("stun: write to relay: %w", werr)
}
readBuf := make([]byte, 1500)
n, _, rerr := pc.ReadFrom(readBuf)
if rerr != nil {
return "", fmt.Errorf("stun: read from relay: %w", rerr)
}
rtt := time.Since(start)
if n < 10 {
return "", fmt.Errorf("stun: relay reply too short (%d bytes)", n)
}
// Validate SOCKS5 UDP wrapper: RSV=00 00, FRAG=00, ATYP=01.
if readBuf[0] != 0x00 || readBuf[1] != 0x00 || readBuf[2] != 0x00 {
return "", fmt.Errorf("stun: bad SOCKS5 UDP header (raw=%x)", readBuf[:10])
}
// We sent IPv4, expect IPv4 reply.
var hdrLen int
switch readBuf[3] {
case 0x01:
hdrLen = 10
case 0x04:
hdrLen = 22
case 0x03:
if n < 5 {
return "", fmt.Errorf("stun: truncated SOCKS5 UDP domain header")
}
hdrLen = 4 + 1 + int(readBuf[4]) + 2
default:
return "", fmt.Errorf("stun: unknown SOCKS5 UDP ATYP=0x%02X", readBuf[3])
}
if n < hdrLen {
return "", fmt.Errorf("stun: relay reply truncated (%d < %d)", n, hdrLen)
}
stunReply := readBuf[hdrLen:n]
_, _, perr := ParseBindingResponse(stunReply, txID)
if perr != nil {
return "", perr
}
return fmt.Sprintf("%dms RTT", rtt.Milliseconds()), nil
})
}
// runAPI — Test 7: HTTP GET Discord API gateway URL through the proxy.
func (e *executor) runAPI() {
if e.shouldSkip("api", e.connectOK) {
return
}
e.runAttempt("api", func(ctx context.Context) (string, error) {
transport := &http.Transport{
DialContext: func(ctx context.Context, _network, addr string) (net.Conn, error) {
return e.dialThroughProxy(ctx, addr)
},
TLSClientConfig: &tls.Config{},
DisableKeepAlives: true,
ResponseHeaderTimeout: e.cfg.PerTestTimeout,
}
client := &http.Client{
Transport: transport,
Timeout: e.cfg.PerTestTimeout,
}
req, err := http.NewRequestWithContext(ctx, "GET", e.cfg.DiscordAPI, nil)
if err != nil {
return "", err
}
resp, err := client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode == 200 || resp.StatusCode == 401 {
return fmt.Sprintf("HTTP %d", resp.StatusCode), nil
}
return "", fmt.Errorf("api: HTTP %d", resp.StatusCode)
})
}
// dialThroughProxy is the http.Transport.DialContext used by runAPI. It
// opens a TCP connection to the SOCKS5 proxy, performs greet+(auth)+CONNECT
// to addr, then returns the established conn.
func (e *executor) dialThroughProxy(ctx context.Context, addr string) (net.Conn, error) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, fmt.Errorf("api: split %q: %w", addr, err)
}
port64, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return nil, fmt.Errorf("api: bad port %q: %w", portStr, err)
}
port := uint16(port64)
var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", e.proxyAddr())
if err != nil {
return nil, err
}
if _, _, gerr := socks5Greeting(ctx, conn, e.cfg.UseAuth); gerr != nil {
_ = conn.Close()
return nil, gerr
}
if e.cfg.UseAuth {
if _, aerr := socks5Auth(ctx, conn, e.cfg.ProxyLogin, e.cfg.ProxyPassword); aerr != nil {
_ = conn.Close()
return nil, aerr
}
}
if _, cerr := socks5Connect(ctx, conn, host, port); cerr != nil {
_ = conn.Close()
return nil, cerr
}
// Clear the deadline socks5* primitives applied — http.Transport
// manages timing past this point.
_ = conn.SetDeadline(time.Time{})
return conn, nil
}