package kernel import ( "context" "encoding/json" "fmt" "io" "net/http" "nofx/logger" "nofx/market" "nofx/provider/hyperliquid" "nofx/provider/nofxos" "nofx/provider/vergex" "nofx/security" "nofx/store" "os" "sort" "strings" "sync" "time" ) // ============================================================================ // Type Definitions // ============================================================================ // PositionInfo position information type PositionInfo struct { Symbol string `json:"symbol"` Side string `json:"side"` // "long" or "short" EntryPrice float64 `json:"entry_price"` MarkPrice float64 `json:"mark_price"` Quantity float64 `json:"quantity"` Leverage int `json:"leverage"` UnrealizedPnL float64 `json:"unrealized_pnl"` UnrealizedPnLPct float64 `json:"unrealized_pnl_pct"` PeakPnLPct float64 `json:"peak_pnl_pct"` // Historical peak profit percentage LiquidationPrice float64 `json:"liquidation_price"` MarginUsed float64 `json:"margin_used"` UpdateTime int64 `json:"update_time"` // Position update timestamp (milliseconds) } // AccountInfo account information type AccountInfo struct { TotalEquity float64 `json:"total_equity"` // Account equity AvailableBalance float64 `json:"available_balance"` // Available balance UnrealizedPnL float64 `json:"unrealized_pnl"` // Unrealized profit/loss TotalPnL float64 `json:"total_pnl"` // Total profit/loss TotalPnLPct float64 `json:"total_pnl_pct"` // Total profit/loss percentage MarginUsed float64 `json:"margin_used"` // Used margin MarginUsedPct float64 `json:"margin_used_pct"` // Margin usage rate PositionCount int `json:"position_count"` // Number of positions } // CandidateCoin candidate coin (from coin pool) type CandidateCoin struct { Symbol string `json:"symbol"` Sources []string `json:"sources"` // Sources: "ai500" and/or "oi_top" } // OITopData open interest growth top data (for AI decision reference) type OITopData struct { Rank int // OI Top ranking OIDeltaPercent float64 // Open interest change percentage (1 hour) OIDeltaValue float64 // Open interest change value PriceDeltaPercent float64 // Price change percentage } // TradingStats trading statistics (for AI input) type TradingStats struct { TotalTrades int `json:"total_trades"` // Total number of trades (closed) WinRate float64 `json:"win_rate"` // Win rate (%) ProfitFactor float64 `json:"profit_factor"` // Profit factor SharpeRatio float64 `json:"sharpe_ratio"` // Sharpe ratio TotalPnL float64 `json:"total_pnl"` // Total profit/loss AvgWin float64 `json:"avg_win"` // Average win AvgLoss float64 `json:"avg_loss"` // Average loss MaxDrawdownPct float64 `json:"max_drawdown_pct"` // Maximum drawdown (%) } // RecentOrder recently completed order (for AI input) type RecentOrder struct { Symbol string `json:"symbol"` // Trading pair Side string `json:"side"` // long/short EntryPrice float64 `json:"entry_price"` // Entry price ExitPrice float64 `json:"exit_price"` // Exit price RealizedPnL float64 `json:"realized_pnl"` // Realized profit/loss PnLPct float64 `json:"pnl_pct"` // Profit/loss percentage EntryTime string `json:"entry_time"` // Entry time ExitTime string `json:"exit_time"` // Exit time HoldDuration string `json:"hold_duration"` // Hold duration, e.g. "2h30m" } // Context trading context (complete information passed to AI) type Context struct { CurrentTime string `json:"current_time"` RuntimeMinutes int `json:"runtime_minutes"` CallCount int `json:"call_count"` Account AccountInfo `json:"account"` Positions []PositionInfo `json:"positions"` CandidateCoins []CandidateCoin `json:"candidate_coins"` PromptVariant string `json:"prompt_variant,omitempty"` TradingStats *TradingStats `json:"trading_stats,omitempty"` RecentOrders []RecentOrder `json:"recent_orders,omitempty"` MarketDataMap map[string]*market.Data `json:"-"` MultiTFMarket map[string]map[string]*market.Data `json:"-"` OITopDataMap map[string]*OITopData `json:"-"` QuantDataMap map[string]*QuantData `json:"-"` VergexDataMap map[string]*vergex.MarketAnalysis `json:"-"` OIRankingData *nofxos.OIRankingData `json:"-"` // Market-wide OI ranking data NetFlowRankingData *nofxos.NetFlowRankingData `json:"-"` // Market-wide fund flow ranking data PriceRankingData *nofxos.PriceRankingData `json:"-"` // Market-wide price gainers/losers BTCETHLeverage int `json:"-"` AltcoinLeverage int `json:"-"` Timeframes []string `json:"-"` } // Decision AI trading decision type Decision struct { Symbol string `json:"symbol"` Action string `json:"action"` // Standard: "open_long", "open_short", "close_long", "close_short", "hold", "wait" // Grid actions: "place_buy_limit", "place_sell_limit", "cancel_order", "cancel_all_orders", "pause_grid", "resume_grid", "adjust_grid" // Opening position parameters Leverage int `json:"leverage,omitempty"` PositionSizeUSD float64 `json:"position_size_usd,omitempty"` StopLoss float64 `json:"stop_loss,omitempty"` TakeProfit float64 `json:"take_profit,omitempty"` // Grid trading parameters Price float64 `json:"price,omitempty"` // Limit order price (for grid) Quantity float64 `json:"quantity,omitempty"` // Order quantity (for grid) LevelIndex int `json:"level_index,omitempty"` // Grid level index OrderID string `json:"order_id,omitempty"` // Order ID (for cancel) // Common parameters Confidence int `json:"confidence,omitempty"` // Confidence level (0-100) RiskUSD float64 `json:"risk_usd,omitempty"` // Maximum USD risk Reasoning string `json:"reasoning"` } // FullDecision AI's complete decision (including chain of thought) type FullDecision struct { SystemPrompt string `json:"system_prompt"` UserPrompt string `json:"user_prompt"` CoTTrace string `json:"cot_trace"` Decisions []Decision `json:"decisions"` RawResponse string `json:"raw_response"` Timestamp time.Time `json:"timestamp"` AIRequestDurationMs int64 `json:"ai_request_duration_ms,omitempty"` } // QuantData quantitative data structure (fund flow, position changes, price changes) type QuantData struct { Symbol string `json:"symbol"` Price float64 `json:"price"` Netflow *NetflowData `json:"netflow,omitempty"` OI map[string]*OIData `json:"oi,omitempty"` PriceChange map[string]float64 `json:"price_change,omitempty"` } type NetflowData struct { Institution *FlowTypeData `json:"institution,omitempty"` Personal *FlowTypeData `json:"personal,omitempty"` } type FlowTypeData struct { Future map[string]float64 `json:"future,omitempty"` Spot map[string]float64 `json:"spot,omitempty"` } type OIData struct { CurrentOI float64 `json:"current_oi"` Delta map[string]*OIDeltaData `json:"delta,omitempty"` } type OIDeltaData struct { OIDelta float64 `json:"oi_delta"` OIDeltaValue float64 `json:"oi_delta_value"` OIDeltaPercent float64 `json:"oi_delta_percent"` } // ============================================================================ // StrategyEngine - Core Strategy Execution Engine // ============================================================================ // StrategyEngine strategy execution engine type StrategyEngine struct { config *store.StrategyConfig nofxosClient *nofxos.Client vergexClient *vergex.Client vergexRankingCache map[string]*vergex.SignalRankItem } // NewStrategyEngine creates strategy execution engine. // claw402WalletKey is optional — if provided, nofxos data requests are routed through claw402. func NewStrategyEngine(config *store.StrategyConfig, claw402WalletKey ...string) *StrategyEngine { // Create NofxOS client with API key from config apiKey := config.Indicators.NofxOSAPIKey if apiKey == "" { apiKey = nofxos.DefaultAuthKey } client := nofxos.NewClient(nofxos.DefaultBaseURL, apiKey) // If claw402 wallet key is provided (from trader's AI config), route through claw402 walletKey := "" if len(claw402WalletKey) > 0 { walletKey = claw402WalletKey[0] } if walletKey == "" { walletKey = os.Getenv("CLAW402_WALLET_KEY") } if walletKey != "" { claw402URL := os.Getenv("CLAW402_URL") if claw402URL == "" { claw402URL = "https://claw402.ai" } claw402Client, err := nofxos.NewClaw402DataClient(claw402URL, walletKey, &logger.MCPLogger{}) if err == nil { client.SetClaw402(claw402Client) logger.Infof("🔗 NofxOS data routed through claw402 (%s)", claw402URL) } else { logger.Warnf("⚠️ Failed to init claw402 data client: %v (using direct nofxos.ai)", err) } vergexClient, err := vergex.NewClient(claw402URL, walletKey, &logger.MCPLogger{}) if err == nil { logger.Infof("🔗 Vergex signals routed through claw402 (%s)", claw402URL) } else { logger.Warnf("⚠️ Failed to init Vergex claw402 client: %v", err) } return &StrategyEngine{ config: config, nofxosClient: client, vergexClient: vergexClient, vergexRankingCache: make(map[string]*vergex.SignalRankItem), } } return &StrategyEngine{ config: config, nofxosClient: client, vergexRankingCache: make(map[string]*vergex.SignalRankItem), } } func (e *StrategyEngine) usesHyperliquidNativeUniverse() bool { if e == nil || e.config == nil { return false } source := e.config.CoinSource if source.SourceType == "hyper_all" || source.SourceType == "hyper_main" || source.SourceType == "hyper_rank" || source.SourceType == "vergex_signal" || source.UseHyperAll || source.UseHyperMain { return true } for _, symbol := range source.StaticCoins { if market.IsXyzDexAsset(symbol) { return true } } return false } // GetRiskControlConfig gets risk control configuration func (e *StrategyEngine) GetRiskControlConfig() store.RiskControlConfig { return e.config.RiskControl } // GetLanguage returns the language from config or falls back to auto-detection func (e *StrategyEngine) GetLanguage() Language { switch e.config.Language { case "zh": return LangChinese case "en": return LangEnglish default: // Fall back to auto-detection from prompt content for backward compatibility return detectLanguage(e.config.PromptSections.RoleDefinition) } } // GetConfig gets complete strategy configuration func (e *StrategyEngine) GetConfig() *store.StrategyConfig { return e.config } // ============================================================================ // Candidate Coins // ============================================================================ // GetCandidateCoins gets candidate coins based on strategy configuration func (e *StrategyEngine) GetCandidateCoins() ([]CandidateCoin, error) { var candidates []CandidateCoin symbolSources := make(map[string][]string) coinSource := e.config.CoinSource switch coinSource.SourceType { case "static": for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"static"}, }) } return e.filterExcludedCoins(candidates), nil case "ai500": // Check use_ai500 flag; if false, fall back to static coins if !coinSource.UseAI500 { logger.Infof("⚠️ source_type is 'ai500' but use_ai500 is false, falling back to static coins") for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"static"}, }) } return e.filterExcludedCoins(candidates), nil } coins, err := e.getAI500Coins(coinSource.AI500Limit) if err != nil { return nil, err } // Empty list is a normal condition, return directly return e.filterExcludedCoins(coins), nil case "oi_top": // Check use_oi_top flag; if false, fall back to static coins if !coinSource.UseOITop { logger.Infof("⚠️ source_type is 'oi_top' but use_oi_top is false, falling back to static coins") for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"static"}, }) } return e.filterExcludedCoins(candidates), nil } coins, err := e.getOITopCoins(coinSource.OITopLimit) if err != nil { return nil, err } // Empty list is a normal condition, return directly return e.filterExcludedCoins(coins), nil case "oi_low": // OI decrease ranking, suitable for short positions if !coinSource.UseOILow { logger.Infof("⚠️ source_type is 'oi_low' but use_oi_low is false, falling back to static coins") for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"static"}, }) } return e.filterExcludedCoins(candidates), nil } coins, err := e.getOILowCoins(coinSource.OILowLimit) if err != nil { return nil, err } // Empty list is a normal condition, return directly return e.filterExcludedCoins(coins), nil case "hyper_all": // All Hyperliquid perp coins if !coinSource.UseHyperAll { logger.Infof("⚠️ source_type is 'hyper_all' but use_hyper_all is false, falling back to static coins") for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"static"}, }) } return e.filterExcludedCoins(candidates), nil } coins, err := e.getHyperAllCoins() if err != nil { return nil, err } return e.filterExcludedCoins(coins), nil case "hyper_main": // Top N Hyperliquid coins by 24h volume if !coinSource.UseHyperMain { logger.Infof("⚠️ source_type is 'hyper_main' but use_hyper_main is false, falling back to static coins") for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"static"}, }) } return e.filterExcludedCoins(candidates), nil } coins, err := e.getHyperMainCoins(coinSource.HyperMainLimit) if err != nil { return nil, err } return e.filterExcludedCoins(coins), nil case "hyper_rank": coins, err := e.getHyperRankCoins(coinSource.HyperRankCategory, coinSource.HyperRankDirection, coinSource.HyperRankLimit) if err != nil { return nil, err } return e.filterExcludedCoins(coins), nil case "vergex_signal": coins, err := e.getVergexSignalCoins( coinSource.VergexLimit, coinSource.VergexMarketType, coinSource.VergexChain, coinSource.VergexLiqBand, coinSource.HyperRankCategory, coinSource.StaticCoins, ) if err != nil { return nil, err } return e.filterExcludedCoins(coins), nil case "mixed": if coinSource.UseAI500 { poolCoins, err := e.getAI500Coins(coinSource.AI500Limit) if err != nil { logger.Infof("⚠️ Failed to get AI500 coins: %v", err) } else { for _, coin := range poolCoins { symbolSources[coin.Symbol] = append(symbolSources[coin.Symbol], "ai500") } } } if coinSource.UseOITop { oiCoins, err := e.getOITopCoins(coinSource.OITopLimit) if err != nil { logger.Infof("⚠️ Failed to get OI Top: %v", err) } else { for _, coin := range oiCoins { symbolSources[coin.Symbol] = append(symbolSources[coin.Symbol], "oi_top") } } } if coinSource.UseOILow { oiLowCoins, err := e.getOILowCoins(coinSource.OILowLimit) if err != nil { logger.Infof("⚠️ Failed to get OI Low: %v", err) } else { for _, coin := range oiLowCoins { symbolSources[coin.Symbol] = append(symbolSources[coin.Symbol], "oi_low") } } } if coinSource.UseHyperAll { hyperCoins, err := e.getHyperAllCoins() if err != nil { logger.Infof("⚠️ Failed to get Hyperliquid All coins: %v", err) } else { for _, coin := range hyperCoins { symbolSources[coin.Symbol] = append(symbolSources[coin.Symbol], "hyper_all") } } } if coinSource.UseHyperMain { hyperMainCoins, err := e.getHyperMainCoins(coinSource.HyperMainLimit) if err != nil { logger.Infof("⚠️ Failed to get Hyperliquid Main coins: %v", err) } else { for _, coin := range hyperMainCoins { symbolSources[coin.Symbol] = append(symbolSources[coin.Symbol], "hyper_main") } } } for _, symbol := range coinSource.StaticCoins { symbol = market.Normalize(symbol) if _, exists := symbolSources[symbol]; !exists { symbolSources[symbol] = []string{"static"} } else { symbolSources[symbol] = append(symbolSources[symbol], "static") } } for symbol, sources := range symbolSources { candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: sources, }) } return e.filterExcludedCoins(candidates), nil default: return nil, fmt.Errorf("unknown coin source type: %s", coinSource.SourceType) } } // filterExcludedCoins removes excluded coins from the candidates list func (e *StrategyEngine) filterExcludedCoins(candidates []CandidateCoin) []CandidateCoin { if len(e.config.CoinSource.ExcludedCoins) == 0 { return candidates } // Build excluded set for O(1) lookup excluded := make(map[string]bool) for _, coin := range e.config.CoinSource.ExcludedCoins { normalized := market.Normalize(coin) excluded[normalized] = true } // Filter out excluded coins filtered := make([]CandidateCoin, 0, len(candidates)) for _, c := range candidates { if !excluded[c.Symbol] { filtered = append(filtered, c) } else { logger.Infof("🚫 Excluded coin: %s", c.Symbol) } } return filtered } func (e *StrategyEngine) getAI500Coins(limit int) ([]CandidateCoin, error) { if limit <= 0 { limit = 30 } symbols, err := e.nofxosClient.GetTopRatedCoins(limit) if err != nil { return nil, err } var candidates []CandidateCoin for _, symbol := range symbols { candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"ai500"}, }) } return candidates, nil } func (e *StrategyEngine) getOITopCoins(limit int) ([]CandidateCoin, error) { if limit <= 0 { limit = 10 } positions, err := e.nofxosClient.GetOITopPositions() if err != nil { return nil, err } var candidates []CandidateCoin for i, pos := range positions { if i >= limit { break } symbol := market.Normalize(pos.Symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"oi_top"}, }) } return candidates, nil } func (e *StrategyEngine) getOILowCoins(limit int) ([]CandidateCoin, error) { if limit <= 0 { limit = 10 } positions, err := e.nofxosClient.GetOILowPositions() if err != nil { return nil, err } var candidates []CandidateCoin for i, pos := range positions { if i >= limit { break } symbol := market.Normalize(pos.Symbol) candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"oi_low"}, }) } return candidates, nil } // getHyperAllCoins returns all available Hyperliquid perpetual coins func (e *StrategyEngine) getHyperAllCoins() ([]CandidateCoin, error) { ctx := context.Background() symbols, err := hyperliquid.GetAllCoinSymbols(ctx) if err != nil { return nil, fmt.Errorf("failed to get Hyperliquid coins: %w", err) } var candidates []CandidateCoin for _, symbol := range symbols { // Add USDT suffix for compatibility normalizedSymbol := market.Normalize(symbol + "USDT") candidates = append(candidates, CandidateCoin{ Symbol: normalizedSymbol, Sources: []string{"hyper_all"}, }) } logger.Infof("✅ Loaded %d Hyperliquid coins (hyper_all)", len(candidates)) return candidates, nil } // getHyperMainCoins returns top N Hyperliquid coins by 24h volume func (e *StrategyEngine) getHyperMainCoins(limit int) ([]CandidateCoin, error) { if limit <= 0 { limit = 20 } ctx := context.Background() symbols, err := hyperliquid.GetMainCoinSymbols(ctx, limit) if err != nil { return nil, fmt.Errorf("failed to get Hyperliquid main coins: %w", err) } var candidates []CandidateCoin for _, symbol := range symbols { // Add USDT suffix for compatibility normalizedSymbol := market.Normalize(symbol + "USDT") candidates = append(candidates, CandidateCoin{ Symbol: normalizedSymbol, Sources: []string{"hyper_main"}, }) } logger.Infof("✅ Loaded %d Hyperliquid main coins (hyper_main) by 24h volume", len(candidates)) return candidates, nil } func clampHyperRankLimit(limit int) int { if limit <= 0 { return 5 } if limit > 10 { return 10 } return limit } func (e *StrategyEngine) getHyperRankCoins(category, direction string, limit int) ([]CandidateCoin, error) { category = strings.ToLower(strings.TrimSpace(category)) if category == "" { category = "stock" } direction = strings.ToLower(strings.TrimSpace(direction)) if direction == "" { direction = "gainers" } limit = clampHyperRankLimit(limit) ctx := context.Background() var ranked []struct { symbol string info hyperliquid.CoinInfo cat string } if category == "crypto" || category == "all" { coins, err := hyperliquid.GetPerpDexCoins(ctx, "") if err != nil { return nil, fmt.Errorf("failed to get Hyperliquid crypto ranking: %w", err) } for _, coin := range coins { ranked = append(ranked, struct { symbol string info hyperliquid.CoinInfo cat string }{symbol: market.Normalize(coin.Symbol + "USDT"), info: coin, cat: "crypto"}) } } if category != "crypto" { coins, err := hyperliquid.GetPerpDexCoins(ctx, "xyz") if err != nil { return nil, fmt.Errorf("failed to get Hyperliquid XYZ ranking: %w", err) } for _, coin := range coins { base := strings.TrimPrefix(coin.Symbol, "xyz:") cat := hyperliquid.XYZCategory(base) if category != "all" && cat != category { continue } ranked = append(ranked, struct { symbol string info hyperliquid.CoinInfo cat string }{symbol: hyperliquid.FormatCoinForAPI("xyz:" + base), info: coin, cat: cat}) } } sort.SliceStable(ranked, func(i, j int) bool { switch direction { case "losers": return ranked[i].info.Change24hPct < ranked[j].info.Change24hPct case "volume": return ranked[i].info.Volume24h > ranked[j].info.Volume24h default: return ranked[i].info.Change24hPct > ranked[j].info.Change24hPct } }) if len(ranked) > limit { ranked = ranked[:limit] } candidates := make([]CandidateCoin, 0, len(ranked)) source := fmt.Sprintf("hyper_rank_%s_%s", category, direction) for _, item := range ranked { candidates = append(candidates, CandidateCoin{Symbol: item.symbol, Sources: []string{source}}) } logger.Infof("✅ Loaded %d Hyperliquid rank coins (%s/%s, capped at %d)", len(candidates), category, direction, limit) return candidates, nil } func (e *StrategyEngine) getVergexSignalCoins(limit int, marketType, chain, liqBand, category string, selectedSymbols []string) ([]CandidateCoin, error) { if e.vergexClient == nil { return nil, fmt.Errorf("vergex signal source requires a configured claw402 wallet") } if marketType == "" { marketType = vergex.DefaultMarketType } chain = vergex.QueryChain(chain) if limit <= 0 { limit = 5 } if limit > store.MaxCandidateCoins { limit = store.MaxCandidateCoins } category = strings.ToLower(strings.TrimSpace(category)) ranking, err := e.vergexClient.GetSignalRanking(context.Background(), vergex.Query{ Chain: chain, LiqBand: liqBand, }) if err != nil { return nil, fmt.Errorf("failed to fetch Vergex signal ranking: %w", err) } rankedItems := vergex.FilterSignalRankingItems(ranking.Items, marketType, store.MaxCandidateCoins) if len(rankedItems) == 0 && strings.TrimSpace(chain) != "" { fallbackRanking, fallbackErr := e.vergexClient.GetSignalRanking(context.Background(), vergex.Query{ LiqBand: liqBand, }) if fallbackErr == nil { fallbackItems := vergex.FilterSignalRankingItems(fallbackRanking.Items, marketType, store.MaxCandidateCoins) if len(fallbackItems) > 0 { logger.Infof("✅ Vergex signal ranking returned TradeFi items after retrying without chain filter (chain=%s)", chain) ranking = fallbackRanking rankedItems = fallbackItems } } else { logger.Warnf("⚠️ Vergex signal ranking retry without chain failed: %v", fallbackErr) } } e.vergexRankingCache = make(map[string]*vergex.SignalRankItem, len(rankedItems)) for _, item := range rankedItems { itemCopy := item if symbol := vergex.TradableSymbolForMarket(item.MarketType, item.Symbol); symbol != "" { e.vergexRankingCache[symbol] = &itemCopy } } if len(selectedSymbols) > 0 { candidates := make([]CandidateCoin, 0, minInt(len(selectedSymbols), limit)) seen := make(map[string]bool) for _, raw := range selectedSymbols { symbol := vergex.TradableSymbolForMarket(marketType, raw) if symbol == "" || seen[symbol] { continue } candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"vergex_signal"}, }) seen[symbol] = true if len(candidates) >= limit { break } } if len(candidates) == 0 { return nil, fmt.Errorf("selected Claw402 symbols are not tradable %s items", marketType) } logger.Infof("✅ Loaded %d selected Vergex candidates (%s)", len(candidates), marketType) return candidates, nil } items := make([]vergex.SignalRankItem, 0, limit) for _, item := range rankedItems { if category != "" && category != "all" && item.Category != category { continue } items = append(items, item) if len(items) >= limit { break } } if len(items) == 0 { if category != "" && category != "all" { return nil, fmt.Errorf("vergex signal ranking returned no tradable %s items in category %s", marketType, category) } return nil, fmt.Errorf("vergex signal ranking returned no tradable %s items", marketType) } candidates := make([]CandidateCoin, 0, len(items)) for _, item := range items { itemCopy := item symbol := vergex.TradableSymbolForMarket(item.MarketType, item.Symbol) if symbol == "" { continue } e.vergexRankingCache[symbol] = &itemCopy candidates = append(candidates, CandidateCoin{ Symbol: symbol, Sources: []string{"vergex_signal"}, }) } logger.Infof("✅ Loaded %d Vergex signal candidates (%s/%s, capped at %d)", len(candidates), marketType, withDefaultText(category, "all"), limit) return candidates, nil } func minInt(a, b int) int { if a < b { return a } return b } func withDefaultText(value, fallback string) string { if strings.TrimSpace(value) == "" { return fallback } return value } // ============================================================================ // External & Quant Data // ============================================================================ // FetchMarketData fetches market data based on strategy configuration func (e *StrategyEngine) FetchMarketData(symbol string) (*market.Data, error) { return market.Get(symbol) } // FetchExternalData fetches external data sources func (e *StrategyEngine) FetchExternalData() (map[string]interface{}, error) { externalData := make(map[string]interface{}) for _, source := range e.config.Indicators.ExternalDataSources { data, err := e.fetchSingleExternalSource(source) if err != nil { logger.Infof("⚠️ Failed to fetch external data source [%s]: %v", source.Name, err) continue } externalData[source.Name] = data } return externalData, nil } func (e *StrategyEngine) fetchSingleExternalSource(source store.ExternalDataSource) (interface{}, error) { // SSRF Protection: Validate URL before making request if err := security.ValidateURL(source.URL); err != nil { return nil, fmt.Errorf("external source URL validation failed: %w", err) } timeout := time.Duration(source.RefreshSecs) * time.Second if timeout == 0 { timeout = 30 * time.Second } // Use SSRF-safe HTTP client client := security.SafeHTTPClient(timeout) req, err := http.NewRequest(source.Method, source.URL, nil) if err != nil { return nil, err } for k, v := range source.Headers { req.Header.Set(k, v) } resp, err := client.Do(req) if err != nil { return nil, err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } var result interface{} if err := json.Unmarshal(body, &result); err != nil { return nil, err } if source.DataPath != "" { result = extractJSONPath(result, source.DataPath) } return result, nil } func extractJSONPath(data interface{}, path string) interface{} { parts := strings.Split(path, ".") current := data for _, part := range parts { if m, ok := current.(map[string]interface{}); ok { current = m[part] } else { return nil } } return current } // FetchQuantData fetches quantitative data for a single coin func (e *StrategyEngine) FetchQuantData(symbol string) (*QuantData, error) { if !e.config.Indicators.EnableQuantData { return nil, nil } if e.usesHyperliquidNativeUniverse() || market.IsXyzDexAsset(symbol) { logger.Infof("⏭️ Skipping NofxOS quant data for Hyperliquid symbol %s; using native Hyperliquid klines/mark data only", symbol) return nil, nil } // Use nofxos client with unified API key include := "oi,price" if e.config.Indicators.EnableQuantNetflow { include = "netflow,oi,price" } nofxosData, err := e.nofxosClient.GetCoinData(symbol, include) if err != nil { return nil, fmt.Errorf("failed to fetch quant data: %w", err) } if nofxosData == nil { return nil, nil } // Convert nofxos.QuantData to kernel.QuantData quantData := &QuantData{ Symbol: nofxosData.Symbol, Price: nofxosData.Price, PriceChange: nofxosData.PriceChange, } // Convert OI data if nofxosData.OI != nil { quantData.OI = make(map[string]*OIData) for exchange, oiData := range nofxosData.OI { if oiData != nil { kData := &OIData{ CurrentOI: oiData.CurrentOI, } if oiData.Delta != nil { kData.Delta = make(map[string]*OIDeltaData) for dur, delta := range oiData.Delta { if delta != nil { kData.Delta[dur] = &OIDeltaData{ OIDelta: delta.OIDelta, OIDeltaValue: delta.OIDeltaValue, OIDeltaPercent: delta.OIDeltaPercent, } } } } quantData.OI[exchange] = kData } } } // Convert Netflow data if nofxosData.Netflow != nil { quantData.Netflow = &NetflowData{} if nofxosData.Netflow.Institution != nil { quantData.Netflow.Institution = &FlowTypeData{ Future: nofxosData.Netflow.Institution.Future, Spot: nofxosData.Netflow.Institution.Spot, } } if nofxosData.Netflow.Personal != nil { quantData.Netflow.Personal = &FlowTypeData{ Future: nofxosData.Netflow.Personal.Future, Spot: nofxosData.Netflow.Personal.Spot, } } } return quantData, nil } // FetchQuantDataBatch batch fetches quantitative data func (e *StrategyEngine) FetchQuantDataBatch(symbols []string) map[string]*QuantData { result := make(map[string]*QuantData) if !e.config.Indicators.EnableQuantData { return result } for _, symbol := range symbols { data, err := e.FetchQuantData(symbol) if err != nil { logger.Infof("⚠️ Failed to fetch quantitative data for %s: %v", symbol, err) continue } if data != nil { result[symbol] = data } } return result } func (e *StrategyEngine) FetchVergexDataBatch(ctx context.Context, symbols []string) map[string]*vergex.MarketAnalysis { result := make(map[string]*vergex.MarketAnalysis) if e == nil || e.config == nil || e.config.CoinSource.SourceType != "vergex_signal" { return result } if e.vergexClient == nil { logger.Warnf("⚠️ Vergex signal data skipped: claw402 wallet is not configured") return result } if ctx == nil { ctx = context.Background() } source := e.config.CoinSource marketType := source.VergexMarketType if marketType == "" { marketType = vergex.DefaultMarketType } chain := source.VergexChain chain = vergex.QueryChain(chain) seen := make(map[string]bool) limited := make([]string, 0, store.MaxCandidateCoins) for _, symbol := range symbols { symbol = vergexDetailSymbolForLookup(marketType, symbol) if symbol == "" { continue } if seen[symbol] { continue } seen[symbol] = true limited = append(limited, symbol) if len(limited) >= store.MaxCandidateCoins+store.MaxPositions { break } } type vergexAnalysisResult struct { symbol string analysis *vergex.MarketAnalysis } resultCh := make(chan vergexAnalysisResult, len(limited)) var wg sync.WaitGroup sem := make(chan struct{}, vergexDetailSymbolConcurrency) for _, symbol := range limited { symbol := symbol querySymbol := vergex.QuerySymbol(symbol) if querySymbol == "" { continue } itemMarketType := marketType itemCategory := "" var ranking *vergex.SignalRankItem if cached, ok := e.vergexRankingCache[symbol]; ok && cached != nil { ranking = cached if cached.MarketType != "" { itemMarketType = cached.MarketType } itemCategory = cached.Category } analysis := &vergex.MarketAnalysis{ Symbol: symbol, QuerySymbol: querySymbol, MarketType: itemMarketType, Ranking: ranking, } query := vergex.Query{ MarketType: itemMarketType, Symbol: symbol, Chain: chain, LiqBand: source.VergexLiqBand, Category: itemCategory, } wg.Add(1) go func() { defer wg.Done() select { case sem <- struct{}{}: defer func() { <-sem }() case <-ctx.Done(): analysis.SignalLabError = ctx.Err().Error() analysis.HeatmapError = ctx.Err().Error() resultCh <- vergexAnalysisResult{symbol: symbol, analysis: analysis} return } e.populateVergexDetailData(ctx, analysis, query) if len(analysis.SignalLab) > 0 || len(analysis.Heatmap) > 0 || analysis.SignalLabError != "" || analysis.HeatmapError != "" || analysis.Ranking != nil { resultCh <- vergexAnalysisResult{symbol: symbol, analysis: analysis} } }() } wg.Wait() close(resultCh) for item := range resultCh { result[item.symbol] = item.analysis } logger.Infof("📊 Vergex detail data ready for %d symbols", len(result)) return result } func vergexDetailSymbolForLookup(marketType, symbol string) string { return vergex.TradableSymbolForMarket(marketType, symbol) } const ( vergexDetailRequestTimeout = 45 * time.Second vergexDetailSymbolConcurrency = 2 ) func (e *StrategyEngine) populateVergexDetailData(ctx context.Context, analysis *vergex.MarketAnalysis, query vergex.Query) { type endpointResult struct { name string body json.RawMessage err error } run := func(name string, fetch func(context.Context, vergex.Query) (json.RawMessage, error), out chan<- endpointResult) { requestCtx, cancel := context.WithTimeout(ctx, vergexDetailRequestTimeout) defer cancel() body, err := fetch(requestCtx, query) out <- endpointResult{name: name, body: body, err: err} } out := make(chan endpointResult, 2) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() run("signal-lab", e.fetchVergexSignalLabWithFallback, out) }() go func() { defer wg.Done() run("heatmap", e.fetchVergexHeatmapWithFallback, out) }() wg.Wait() close(out) for item := range out { switch item.name { case "signal-lab": if item.err != nil { logger.Warnf("⚠️ Failed to fetch Vergex signal-lab for %s: %v", analysis.Symbol, item.err) analysis.SignalLabError = item.err.Error() } else { analysis.SignalLab = item.body } case "heatmap": if item.err != nil { logger.Warnf("⚠️ Failed to fetch Vergex heatmap for %s: %v", analysis.Symbol, item.err) analysis.HeatmapError = item.err.Error() } else { analysis.Heatmap = item.body } } } } func (e *StrategyEngine) fetchVergexSignalLabWithFallback(ctx context.Context, query vergex.Query) (json.RawMessage, error) { var lastErr error for idx, candidate := range vergexDetailQueryCandidates(query) { body, err := e.vergexClient.GetSignalLab(ctx, candidate) if err == nil { if idx > 0 { logger.Infof("✅ Vergex signal-lab succeeded with fallback marketType=%s chain=%s", candidate.MarketType, withDefaultText(candidate.Chain, "default")) } return body, nil } lastErr = err if !isRetryableVergexDetailError(err) { break } } return nil, lastErr } func (e *StrategyEngine) fetchVergexHeatmapWithFallback(ctx context.Context, query vergex.Query) (json.RawMessage, error) { var lastErr error for idx, candidate := range vergexDetailQueryCandidates(query) { body, err := e.vergexClient.GetCostLiquidationHeatmap(ctx, candidate) if err == nil { if idx > 0 { logger.Infof("✅ Vergex heatmap succeeded with fallback marketType=%s chain=%s", candidate.MarketType, withDefaultText(candidate.Chain, "default")) } return body, nil } lastErr = err if !isRetryableVergexDetailError(err) { break } } return nil, lastErr } func vergexDetailQueryCandidates(query vergex.Query) []vergex.Query { marketTypes := vergexDetailMarketTypeCandidates(query) chains := uniqueValues(query.Chain, "mainnet", "") candidates := make([]vergex.Query, 0, len(marketTypes)*len(chains)) for _, marketType := range marketTypes { for _, chain := range chains { candidate := query candidate.MarketType = marketType candidate.Chain = chain candidates = append(candidates, candidate) } } return candidates } func vergexDetailMarketTypeCandidates(query vergex.Query) []string { if isVergexAllMarketType(query.MarketType) { if market.IsXyzDexAsset(query.Symbol) { return uniqueNonEmpty(vergex.DefaultMarketType, "hip3-perp", "hip3Perp", "core_perp") } return uniqueNonEmpty("core_perp", vergex.DefaultMarketType, "hip3-perp", "hip3Perp") } values := []string{query.MarketType, vergex.DefaultMarketType, "hip3-perp", "hip3Perp", "core_perp"} return uniqueNonEmpty(values...) } func isVergexAllMarketType(marketType string) bool { switch strings.ToLower(strings.TrimSpace(marketType)) { case "", "all", "any", "ranking", "signal-ranking", "signal_ranking", "claw402", "vergex": return true default: return false } } func isRetryableVergexDetailError(err error) bool { if err == nil { return false } msg := strings.ToLower(err.Error()) return strings.Contains(msg, "invalid markettype") || strings.Contains(msg, "invalid_request") || strings.Contains(msg, "invalid chain") || strings.Contains(msg, "market not found") || strings.Contains(msg, "not_found") } func uniqueNonEmpty(values ...string) []string { out := make([]string, 0, len(values)) seen := make(map[string]bool, len(values)) for _, value := range values { value = strings.TrimSpace(value) if value == "" || seen[value] { continue } seen[value] = true out = append(out, value) } return out } func uniqueValues(values ...string) []string { out := make([]string, 0, len(values)) seen := make(map[string]bool, len(values)) for _, value := range values { value = strings.TrimSpace(value) if seen[value] { continue } seen[value] = true out = append(out, value) } return out } // FetchOIRankingData fetches market-wide OI ranking data func (e *StrategyEngine) FetchOIRankingData() *nofxos.OIRankingData { indicators := e.config.Indicators if !indicators.EnableOIRanking { return nil } if e.usesHyperliquidNativeUniverse() { logger.Infof("⏭️ Skipping NofxOS OI ranking for Hyperliquid strategy; native Hyperliquid universe is the source of truth") return nil } duration := indicators.OIRankingDuration if duration == "" { duration = "1h" } limit := indicators.OIRankingLimit if limit <= 0 { limit = 10 } logger.Infof("📊 Fetching OI ranking data (duration: %s, limit: %d)", duration, limit) data, err := e.nofxosClient.GetOIRanking(duration, limit) if err != nil { logger.Warnf("⚠️ Failed to fetch OI ranking data: %v", err) return nil } logger.Infof("✓ OI ranking data ready: %d top, %d low positions", len(data.TopPositions), len(data.LowPositions)) return data } // FetchNetFlowRankingData fetches market-wide NetFlow ranking data func (e *StrategyEngine) FetchNetFlowRankingData() *nofxos.NetFlowRankingData { indicators := e.config.Indicators if !indicators.EnableNetFlowRanking { return nil } if e.usesHyperliquidNativeUniverse() { logger.Infof("⏭️ Skipping NofxOS netflow ranking for Hyperliquid strategy; native Hyperliquid universe is the source of truth") return nil } duration := indicators.NetFlowRankingDuration if duration == "" { duration = "1h" } limit := indicators.NetFlowRankingLimit if limit <= 0 { limit = 10 } logger.Infof("💰 Fetching NetFlow ranking data (duration: %s, limit: %d)", duration, limit) data, err := e.nofxosClient.GetNetFlowRanking(duration, limit) if err != nil { logger.Warnf("⚠️ Failed to fetch NetFlow ranking data: %v", err) return nil } logger.Infof("✓ NetFlow ranking data ready: inst_in=%d, inst_out=%d, retail_in=%d, retail_out=%d", len(data.InstitutionFutureTop), len(data.InstitutionFutureLow), len(data.PersonalFutureTop), len(data.PersonalFutureLow)) return data } // FetchPriceRankingData fetches market-wide price ranking data (gainers/losers) func (e *StrategyEngine) FetchPriceRankingData() *nofxos.PriceRankingData { indicators := e.config.Indicators if !indicators.EnablePriceRanking { return nil } if e.usesHyperliquidNativeUniverse() { logger.Infof("⏭️ Skipping NofxOS price ranking for Hyperliquid strategy; native Hyperliquid universe is the source of truth") return nil } durations := indicators.PriceRankingDuration if durations == "" { durations = "1h" } limit := indicators.PriceRankingLimit if limit <= 0 { limit = 10 } logger.Infof("📈 Fetching Price ranking data (durations: %s, limit: %d)", durations, limit) data, err := e.nofxosClient.GetPriceRanking(durations, limit) if err != nil { logger.Warnf("⚠️ Failed to fetch Price ranking data: %v", err) return nil } logger.Infof("✓ Price ranking data ready for %d durations", len(data.Durations)) return data } // ============================================================================ // Helper Functions // ============================================================================ // detectLanguage detects language from text content // Returns LangChinese if text contains Chinese characters, otherwise LangEnglish func detectLanguage(text string) Language { for _, r := range text { if r >= 0x4E00 && r <= 0x9FFF { return LangChinese } } return LangEnglish }