package backtest import ( "context" "errors" "fmt" "os" "path/filepath" "sync" "time" "nofx/kernel" "nofx/logger" "nofx/mcp" ) var ( errBacktestCompleted = errors.New("backtest completed") errLiquidated = errors.New("account liquidated") ) const ( metricsWriteInterval = 5 * time.Second aiDecisionMaxRetries = 3 ) // Runner encapsulates the lifecycle of a single backtest run. type Runner struct { cfg BacktestConfig feed *DataFeed account *BacktestAccount strategyEngine *kernel.StrategyEngine decisionLogDir string mcpClient mcp.AIClient statusMu sync.RWMutex status RunState stateMu sync.RWMutex state *BacktestState pauseCh chan struct{} resumeCh chan struct{} stopCh chan struct{} doneCh chan struct{} err error errMu sync.RWMutex lastError string lastCheckpoint time.Time createdAt time.Time lastMetricsWrite time.Time aiCache *AICache cachePath string lockInfo *RunLockInfo lockStop chan struct{} lockStopOnce sync.Once // Ensures lockStop is closed only once } // NewRunner constructs a backtest runner. func NewRunner(cfg BacktestConfig, mcpClient mcp.AIClient) (*Runner, error) { if err := ensureRunDir(cfg.RunID); err != nil { return nil, err } client, err := configureMCPClient(cfg, mcpClient) if err != nil { return nil, err } feed, err := NewDataFeed(cfg) if err != nil { return nil, err } if err := os.MkdirAll(decisionLogDir(cfg.RunID), 0o755); err != nil { return nil, err } dLogDir := decisionLogDir(cfg.RunID) account := NewBacktestAccount(cfg.InitialBalance, cfg.FeeBps, cfg.SlippageBps) createdAt := time.Now().UTC() state := &BacktestState{ Positions: make(map[string]PositionSnapshot), Cash: account.Cash(), Equity: cfg.InitialBalance, UnrealizedPnL: 0, RealizedPnL: 0, MaxEquity: cfg.InitialBalance, MinEquity: cfg.InitialBalance, MaxDrawdownPct: 0, LastUpdate: createdAt, } var ( aiCache *AICache cachePath string ) if cfg.CacheAI || cfg.ReplayOnly || cfg.SharedAICachePath != "" { cachePath = cfg.SharedAICachePath if cachePath == "" { cachePath = filepath.Join(runDir(cfg.RunID), "ai_cache.json") } cache, err := LoadAICache(cachePath) if err != nil { return nil, fmt.Errorf("load ai cache: %w", err) } aiCache = cache } // Create strategy engine from backtest config for unified prompt generation strategyConfig := cfg.ToStrategyConfig() strategyEngine := kernel.NewStrategyEngine(strategyConfig) r := &Runner{ cfg: cfg, feed: feed, account: account, strategyEngine: strategyEngine, decisionLogDir: dLogDir, mcpClient: client, status: RunStateCreated, state: state, pauseCh: make(chan struct{}, 1), resumeCh: make(chan struct{}, 1), stopCh: make(chan struct{}, 1), doneCh: make(chan struct{}), createdAt: createdAt, aiCache: aiCache, cachePath: cachePath, } if err := r.initLock(); err != nil { return nil, err } return r, nil } func (r *Runner) initLock() error { if r.cfg.RunID == "" { return fmt.Errorf("run_id required for lock") } info, err := acquireRunLock(r.cfg.RunID) if err != nil { return err } r.lockInfo = info r.lockStop = make(chan struct{}) go r.lockHeartbeatLoop() return nil } func (r *Runner) lockHeartbeatLoop() { ticker := time.NewTicker(lockHeartbeatInterval) defer ticker.Stop() for { select { case <-ticker.C: if err := updateRunLockHeartbeat(r.lockInfo); err != nil { logger.Infof("failed to update lock heartbeat for %s: %v", r.cfg.RunID, err) } case <-r.lockStop: return } } } func (r *Runner) releaseLock() { // Use sync.Once to ensure channel is closed exactly once, preventing panic on double-close r.lockStopOnce.Do(func() { if r.lockStop != nil { close(r.lockStop) } }) if err := deleteRunLock(r.cfg.RunID); err != nil { logger.Infof("failed to release lock for %s: %v", r.cfg.RunID, err) } r.lockInfo = nil } // Start launches the backtest loop. func (r *Runner) Start(ctx context.Context) error { r.statusMu.Lock() if r.status != RunStateCreated && r.status != RunStatePaused { r.statusMu.Unlock() return fmt.Errorf("cannot start runner in state %s", r.status) } r.status = RunStateRunning r.statusMu.Unlock() go r.loop(ctx) return nil } // PersistMetadata writes the current snapshot to run.json. func (r *Runner) PersistMetadata() { r.persistMetadata() } func (r *Runner) setLastError(err error) { r.errMu.Lock() defer r.errMu.Unlock() if err == nil { r.lastError = "" return } r.lastError = err.Error() } func (r *Runner) lastErrorString() string { r.errMu.RLock() defer r.errMu.RUnlock() return r.lastError } // CurrentMetadata returns the metadata corresponding to the current in-memory state. func (r *Runner) CurrentMetadata() *RunMetadata { state := r.snapshotState() meta := r.buildMetadata(state, r.Status()) meta.CreatedAt = r.createdAt meta.UpdatedAt = state.LastUpdate return meta } func (r *Runner) Pause() { select { case r.pauseCh <- struct{}{}: default: } } func (r *Runner) Resume() { select { case r.resumeCh <- struct{}{}: default: } } func (r *Runner) Stop() { select { case r.stopCh <- struct{}{}: default: } } func (r *Runner) Wait() error { <-r.doneCh r.statusMu.RLock() defer r.statusMu.RUnlock() return r.err } // Status returns the current run state. func (r *Runner) Status() RunState { r.statusMu.RLock() defer r.statusMu.RUnlock() return r.status } // StatusPayload builds the status response for the API. func (r *Runner) StatusPayload() StatusPayload { snapshot := r.snapshotState() progress := progressPercent(snapshot, r.cfg) // Build position statuses with unrealized P&L positions := make([]PositionStatus, 0, len(snapshot.Positions)) for _, pos := range snapshot.Positions { if pos.Quantity <= 0 { continue } // Get mark price from feed if available markPrice := pos.AvgPrice // fallback to entry price if r.feed != nil && snapshot.BarTimestamp > 0 { if md, _, err := r.feed.BuildMarketData(snapshot.BarTimestamp); err == nil { if data, ok := md[pos.Symbol]; ok { markPrice = data.CurrentPrice } } } // Calculate unrealized P&L var unrealizedPnL float64 if pos.Side == "long" { unrealizedPnL = (markPrice - pos.AvgPrice) * pos.Quantity } else { unrealizedPnL = (pos.AvgPrice - markPrice) * pos.Quantity } // Calculate P&L percentage based on margin pnlPct := 0.0 if pos.MarginUsed > 0 { pnlPct = (unrealizedPnL / pos.MarginUsed) * 100 } positions = append(positions, PositionStatus{ Symbol: pos.Symbol, Side: pos.Side, Quantity: pos.Quantity, EntryPrice: pos.AvgPrice, MarkPrice: markPrice, Leverage: pos.Leverage, UnrealizedPnL: unrealizedPnL, UnrealizedPnLPct: pnlPct, MarginUsed: pos.MarginUsed, }) } payload := StatusPayload{ RunID: r.cfg.RunID, State: r.Status(), ProgressPct: progress, ProcessedBars: snapshot.BarIndex, CurrentTime: snapshot.BarTimestamp, DecisionCycle: snapshot.DecisionCycle, Equity: snapshot.Equity, UnrealizedPnL: snapshot.UnrealizedPnL, RealizedPnL: snapshot.RealizedPnL, Positions: positions, Note: snapshot.LiquidationNote, LastError: r.lastErrorString(), LastUpdatedIso: snapshot.LastUpdate.UTC().Format(time.RFC3339), } return payload } func (r *Runner) snapshotState() BacktestState { r.stateMu.RLock() defer r.stateMu.RUnlock() copyState := *r.state copyState.Positions = make(map[string]PositionSnapshot, len(r.state.Positions)) for k, v := range r.state.Positions { copyState.Positions[k] = v } return copyState }