diff --git a/config.json.example b/config.json.example index 87b01edd..ac9d5ac6 100644 --- a/config.json.example +++ b/config.json.example @@ -5,7 +5,6 @@ "altcoin_leverage": 5 }, "use_default_coins": true, - "inside_coins": true, "default_coins": [ "BTCUSDT", "ETHUSDT", diff --git a/config/config.go b/config/config.go index 430e428c..37a537db 100644 --- a/config/config.go +++ b/config/config.go @@ -54,7 +54,6 @@ type LeverageConfig struct { type Config struct { Traders []TraderConfig `json:"traders"` UseDefaultCoins bool `json:"use_default_coins"` // 是否使用默认主流币种列表 - UseInsideCoins bool `json:"use_inside_coins"` // 是否使用内置AI评分币种列表 DefaultCoins []string `json:"default_coins"` // 默认主流币种池 APIServerPort int `json:"api_server_port"` MaxDailyLoss float64 `json:"max_daily_loss"` diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 2c1a2f6f..fb233a31 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -51,7 +51,12 @@ nofx/ │ ├── market/ # Market data fetching │ └── data.go # Market data & technical indicators (TA-Lib) -│ +│ └── api_client.go # Market data acquisition API +│ └── websocket_client.go # Market data acquisition WebSocket interface +│ └── combined_streams.go # Market data acquisition: Combined streaming (single link to subscribe to multiple cryptocurrencies) +│ └── monitor.go # Market data cache +│ └── types.go # market structure + ├── pool/ # Coin pool management │ └── coin_pool.go # AI500 + OI Top merged pool │ diff --git a/docs/architecture/README.zh-CN.md b/docs/architecture/README.zh-CN.md index 36732b09..4acc0f90 100644 --- a/docs/architecture/README.zh-CN.md +++ b/docs/architecture/README.zh-CN.md @@ -51,6 +51,11 @@ nofx/ │ ├── market/ # 市场数据获取 │ └── data.go # 市场数据与技术指标(TA-Lib) +│ └── api_client.go # 行情获取 Api接口 +│ └── websocket_client.go # 行情获取 Websocket接口 +│ └── combined_streams.go # 行情获取 组合流式(单链接订阅多个币种) +│ └── monitor.go # 行情数据缓存 +│ └── types.go # market结构体 │ ├── pool/ # 币种池管理 │ └── coin_pool.go # AI500 + OI Top 合并池 diff --git a/go.mod b/go.mod index 0c6dcfde..067172fd 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,8 @@ require ( github.com/gin-gonic/gin v1.11.0 github.com/golang-jwt/jwt/v5 v5.2.0 github.com/google/uuid v1.6.0 - github.com/mattn/go-sqlite3 v1.14.32 + github.com/gorilla/websocket v1.5.3 + github.com/mattn/go-sqlite3 v1.14.16 github.com/pquerna/otp v1.4.0 github.com/sonirico/go-hyperliquid v0.17.0 golang.org/x/crypto v0.42.0 @@ -26,6 +27,7 @@ require ( github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/go-sysinfo v1.15.4 // indirect github.com/elastic/go-windows v1.0.2 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.5 // indirect @@ -37,7 +39,6 @@ require ( github.com/go-playground/validator/v10 v10.27.0 // indirect github.com/goccy/go-json v0.10.4 // indirect github.com/goccy/go-yaml v1.18.0 // indirect - github.com/gorilla/websocket v1.5.3 // indirect github.com/holiman/uint256 v1.3.2 // indirect github.com/joho/godotenv v1.5.1 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -55,6 +56,7 @@ require ( github.com/prometheus/procfs v0.17.0 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.54.0 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/shopspring/decimal v1.4.0 // indirect github.com/sonirico/vago v0.9.0 // indirect @@ -78,4 +80,8 @@ require ( golang.org/x/tools v0.36.0 // indirect google.golang.org/protobuf v1.36.9 // indirect howett.net/plist v1.0.1 // indirect + modernc.org/libc v1.37.6 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/sqlite v1.28.0 // indirect ) diff --git a/main.go b/main.go index 1ffc562e..36537b50 100644 --- a/main.go +++ b/main.go @@ -265,6 +265,7 @@ func main() { // 启动流行情数据 - 默认使用所有交易员设置的币种 如果没有设置币种 则优先使用系统默认 go market.NewWSMonitor(150).Start(database.GetCustomCoins()) + //go market.NewWSMonitor(150).Start([]string{}) //这里是一个使用方式 传入空的话 则使用market市场的所有币种 // 设置优雅退出 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM) diff --git a/market/feature_engine.go b/market/feature_engine.go deleted file mode 100644 index 91540a29..00000000 --- a/market/feature_engine.go +++ /dev/null @@ -1,229 +0,0 @@ -package market - -import ( - "fmt" - "math" - "time" -) - -type FeatureEngine struct { - alertThresholds AlertThresholds -} - -func NewFeatureEngine(thresholds AlertThresholds) *FeatureEngine { - return &FeatureEngine{ - alertThresholds: thresholds, - } -} - -func (e *FeatureEngine) CalculateFeatures(symbol string, klines []Kline) *SymbolFeatures { - if len(klines) < 20 { - return nil - } - - features := &SymbolFeatures{ - Symbol: symbol, - Timestamp: time.Now(), - } - - // 提取价格和交易量数据 - closes := make([]float64, len(klines)) - volumes := make([]float64, len(klines)) - highs := make([]float64, len(klines)) - lows := make([]float64, len(klines)) - - for i, k := range klines { - closes[i] = k.Close - volumes[i] = k.Volume - highs[i] = k.High - lows[i] = k.Low - } - - // 价格特征 - features.Price = closes[len(closes)-1] - features.PriceChange15Min = (closes[len(closes)-1] - closes[len(closes)-2]) / closes[len(closes)-2] - - if len(closes) >= 5 { - features.PriceChange1H = (closes[len(closes)-1] - closes[len(closes)-5]) / closes[len(closes)-5] - } - if len(closes) >= 17 { - features.PriceChange4H = (closes[len(closes)-1] - closes[len(closes)-17]) / closes[len(closes)-17] - } - - // 交易量特征 - currentVolume := volumes[len(volumes)-1] - features.Volume = currentVolume - - // 5周期平均交易量 - if len(volumes) >= 6 { - avgVolume5 := e.calculateAverage(volumes[len(volumes)-6 : len(volumes)-1]) - features.VolumeRatio5 = currentVolume / avgVolume5 - } - - // 20周期平均交易量 - if len(volumes) >= 21 { - avgVolume20 := e.calculateAverage(volumes[len(volumes)-21 : len(volumes)-1]) - features.VolumeRatio20 = currentVolume / avgVolume20 - } - - // 交易量趋势 - if features.VolumeRatio20 > 0 { - features.VolumeTrend = features.VolumeRatio5 / features.VolumeRatio20 - } - - // 技术指标 - features.RSI14 = e.calculateRSI(closes, 14) - features.SMA5 = e.calculateSMA(closes, 5) - features.SMA10 = e.calculateSMA(closes, 10) - features.SMA20 = e.calculateSMA(closes, 20) - - // 波动特征 - currentHigh := highs[len(highs)-1] - currentLow := lows[len(lows)-1] - features.HighLowRatio = (currentHigh - currentLow) / features.Price - features.Volatility20 = e.calculateVolatility(closes, 20) - - // 价格在区间中的位置 - if currentHigh != currentLow { - features.PositionInRange = (features.Price - currentLow) / (currentHigh - currentLow) - } else { - features.PositionInRange = 0.5 - } - - return features -} - -func (e *FeatureEngine) calculateAverage(values []float64) float64 { - sum := 0.0 - for _, v := range values { - sum += v - } - return sum / float64(len(values)) -} - -func (e *FeatureEngine) calculateSMA(prices []float64, period int) float64 { - if len(prices) < period { - return 0 - } - return e.calculateAverage(prices[len(prices)-period:]) -} - -func (e *FeatureEngine) calculateRSI(prices []float64, period int) float64 { - if len(prices) <= period { - return 50 - } - - gains := make([]float64, 0) - losses := make([]float64, 0) - - for i := 1; i < len(prices); i++ { - change := prices[i] - prices[i-1] - if change > 0 { - gains = append(gains, change) - losses = append(losses, 0) - } else { - gains = append(gains, 0) - losses = append(losses, -change) - } - } - - // 只取最近period个数据点 - if len(gains) > period { - gains = gains[len(gains)-period:] - losses = losses[len(losses)-period:] - } - - avgGain := e.calculateAverage(gains) - avgLoss := e.calculateAverage(losses) - - if avgLoss == 0 { - return 100 - } - - rs := avgGain / avgLoss - return 100 - (100 / (1 + rs)) -} - -func (e *FeatureEngine) calculateVolatility(prices []float64, period int) float64 { - if len(prices) < period { - return 0 - } - - periodPrices := prices[len(prices)-period:] - mean := e.calculateAverage(periodPrices) - - variance := 0.0 - for _, price := range periodPrices { - variance += math.Pow(price-mean, 2) - } - variance /= float64(len(periodPrices)) - - return math.Sqrt(variance) / mean -} - -func (e *FeatureEngine) DetectAlerts(features *SymbolFeatures) []Alert { - var alerts []Alert - - // 交易量放大检测 - if features.VolumeRatio5 > e.alertThresholds.VolumeSpike { - alerts = append(alerts, Alert{ - Type: "VOLUME_SPIKE", - Symbol: features.Symbol, - Value: features.VolumeRatio5, - Threshold: e.alertThresholds.VolumeSpike, - Message: fmt.Sprintf("%s 交易量放大 %.2f 倍", features.Symbol, features.VolumeRatio5), - Timestamp: time.Now(), - }) - } - - // 15分钟价格异动 - if math.Abs(features.PriceChange15Min) > e.alertThresholds.PriceChange15Min { - direction := "上涨" - if features.PriceChange15Min < 0 { - direction = "下跌" - } - alerts = append(alerts, Alert{ - Type: "PRICE_CHANGE_15MIN", - Symbol: features.Symbol, - Value: features.PriceChange15Min, - Threshold: e.alertThresholds.PriceChange15Min, - Message: fmt.Sprintf("%s 15分钟%s %.2f%%", features.Symbol, direction, features.PriceChange15Min*100), - Timestamp: time.Now(), - }) - } - - // 交易量趋势 - if features.VolumeTrend > e.alertThresholds.VolumeTrend { - alerts = append(alerts, Alert{ - Type: "VOLUME_TREND", - Symbol: features.Symbol, - Value: features.VolumeTrend, - Threshold: e.alertThresholds.VolumeTrend, - Message: fmt.Sprintf("%s 交易量趋势增强 %.2f 倍", features.Symbol, features.VolumeTrend), - Timestamp: time.Now(), - }) - } - - // RSI超买超卖 - if features.RSI14 > e.alertThresholds.RSIOverbought { - alerts = append(alerts, Alert{ - Type: "RSI_OVERBOUGHT", - Symbol: features.Symbol, - Value: features.RSI14, - Threshold: e.alertThresholds.RSIOverbought, - Message: fmt.Sprintf("%s RSI超买: %.2f", features.Symbol, features.RSI14), - Timestamp: time.Now(), - }) - } else if features.RSI14 < e.alertThresholds.RSIOversold { - alerts = append(alerts, Alert{ - Type: "RSI_OVERSOLD", - Symbol: features.Symbol, - Value: features.RSI14, - Threshold: e.alertThresholds.RSIOversold, - Message: fmt.Sprintf("%s RSI超卖: %.2f", features.Symbol, features.RSI14), - Timestamp: time.Now(), - }) - } - - return alerts -} diff --git a/market/monitor.go b/market/monitor.go index b751dfe8..337640d8 100644 --- a/market/monitor.go +++ b/market/monitor.go @@ -4,8 +4,6 @@ import ( "encoding/json" "fmt" "log" - "math" - "sort" "strings" "sync" "time" @@ -14,7 +12,6 @@ import ( type WSMonitor struct { wsClient *WSClient combinedClient *CombinedStreamsClient - featureEngine *FeatureEngine symbols []string featuresMap sync.Map alertsChan chan Alert @@ -41,7 +38,6 @@ func NewWSMonitor(batchSize int) *WSMonitor { WSMonitorCli = &WSMonitor{ wsClient: NewWSClient(), combinedClient: NewCombinedStreamsClient(batchSize), - featureEngine: NewFeatureEngine(config.AlertThresholds), alertsChan: make(chan Alert, 1000), batchSize: batchSize, } @@ -63,6 +59,7 @@ func (m *WSMonitor) Initialize(coins []string) error { for _, symbol := range exchangeInfo.Symbols { if symbol.Status == "TRADING" && symbol.ContractType == "PERPETUAL" && strings.ToUpper(symbol.Symbol[len(symbol.Symbol)-4:]) == "USDT" { m.symbols = append(m.symbols, symbol.Symbol) + m.filterSymbols.Store(symbol.Symbol, true) } } } else { @@ -133,12 +130,6 @@ func (m *WSMonitor) Start(coins []string) { log.Fatalf("❌ 批量订阅流: %v", err) return } - // 启动警报处理器 - //go m.handleAlerts() - // 启动定期清理任务 - //go m.cleanupInactiveSymbols() - // 输出监控统计 - 评分前十名 - //go m.printFilterStats(20) // 订阅所有交易对 err = m.subscribeAll() if err != nil { @@ -239,60 +230,6 @@ func (m *WSMonitor) processKlineUpdate(symbol string, wsData KlineWSData, _time } klineDataMap.Store(symbol, klines) - // 计算特征并检测警报 - if len(klines) >= 20 { - features := m.featureEngine.CalculateFeatures(symbol, klines) - if features != nil { - m.featuresMap.Store(symbol, features) - - alerts := m.featureEngine.DetectAlerts(features) - hasAlert := len(alerts) > 0 - - // 更新统计信息 - m.updateSymbolStats(symbol, features, hasAlert) - - for _, alert := range alerts { - m.alertsChan <- alert - } - - // 实时日志输出重要特征 - if len(alerts) > 0 || features.VolumeRatio5 > 2.0 || math.Abs(features.PriceChange15Min) > 0.02 { - //log.Printf("📊 %s - 价格: %.4f, 15分钟变动: %.2f%%, 交易量倍数: %.2f, RSI: %.1f", - // symbol, features.Price, features.PriceChange15Min*100, - // features.VolumeRatio5, features.RSI14) - } - } - } -} - -func (m *WSMonitor) processTickerUpdate(symbol string, tickerData TickerWSData) { - // 存储ticker数据 - m.tickerDataMap.Store(symbol, tickerData) -} - -func (m *WSMonitor) handleAlerts() { - alertCounts := make(map[string]int) - lastReset := time.Now() - - for alert := range m.alertsChan { - // 重置计数器(每小时) - if time.Since(lastReset) > time.Hour { - alertCounts = make(map[string]int) - lastReset = time.Now() - } - - // 警报去重和频率控制 - alertKey := fmt.Sprintf("%s_%s", alert.Symbol, alert.Type) - alertCounts[alertKey]++ - m.filterSymbols.Store(alert.Symbol, true) - - //log.Printf("✅ 自动添加监控: %s (因警报: %s)", alert.Symbol, alert.Message) - if alertCounts[alertKey] <= 3 { // 每小时最多3次相同警报 - //log.Printf("🚨 实时警报: %s", alert.Message) - - // 这里可以添加其他警报处理逻辑 - } - } } func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, error) { @@ -317,204 +254,7 @@ func (m *WSMonitor) GetCurrentKlines(symbol string, _time string) ([]Kline, erro return value.([]Kline), nil } -func (m *WSMonitor) GetCurrentFeatures(symbol string) (*SymbolFeatures, bool) { - value, exists := m.featuresMap.Load(symbol) - if !exists { - return nil, false - } - return value.(*SymbolFeatures), true -} - -func (m *WSMonitor) GetAllFeatures() map[string]*SymbolFeatures { - features := make(map[string]*SymbolFeatures) - m.featuresMap.Range(func(key, value interface{}) bool { - features[key.(string)] = value.(*SymbolFeatures) - return true - }) - return features -} - func (m *WSMonitor) Close() { m.wsClient.Close() close(m.alertsChan) } -func (m *WSMonitor) printFilterStats(nember int) { - ticker := time.NewTicker(2 * time.Minute) - defer ticker.Stop() - - for range ticker.C { - var monitoredSymbols []string - m.filterSymbols.Range(func(key, value interface{}) bool { - monitoredSymbols = append(monitoredSymbols, key.(string)) - return true - }) - - log.Printf("🎯 监控统计 - 总数: %d, 币种: %v", - len(monitoredSymbols), monitoredSymbols) - - // 打印前5个评分最高的币种 - type symbolScore struct { - symbol string - score float64 - } - var topScores []symbolScore - - m.symbolStats.Range(func(key, value interface{}) bool { - symbol := key.(string) - stats := value.(*SymbolStats) - topScores = append(topScores, symbolScore{symbol, stats.Score}) - return true - }) - - // 按评分排序 - sort.Slice(topScores, func(i, j int) bool { - return topScores[i].score > topScores[j].score - }) - m.FilterSymbol = nil - if len(topScores) > 0 { - log.Printf("🏆 评分TOP%v:", nember) - for i := 0; i < len(topScores) && i < nember; i++ { - m.FilterSymbol = append(m.FilterSymbol, topScores[i].symbol) - log.Printf(" %d. %s: %.1f分", i+1, topScores[i].symbol, topScores[i].score) - } - } - } -} - -// evaluateSymbolScore 评估币种得分,决定是否保留 -func (m *WSMonitor) evaluateSymbolScore(symbol string, features *SymbolFeatures) float64 { - score := 0.0 - - // 交易量活跃度评分 (权重: 40%) - if features.VolumeRatio5 > 1.5 { - score += 40 * math.Min(features.VolumeRatio5/5.0, 1.0) - } - - // 价格波动评分 (权重: 30%) - volatilityScore := math.Abs(features.PriceChange15Min) * 1000 // 放大系数 - score += 30 * math.Min(volatilityScore/10.0, 1.0) // 最大10%波动得满分 - - // RSI活跃度评分 (权重: 20%) - if features.RSI14 < 30 || features.RSI14 > 70 { - score += 20 // RSI在极端区域 - } else if features.RSI14 < 40 || features.RSI14 > 60 { - score += 10 // RSI在活跃区域 - } - - // 交易量趋势评分 (权重: 10%) - if features.VolumeTrend > 1.2 { - score += 10 * math.Min(features.VolumeTrend/3.0, 1.0) - } - - return score -} - -// shouldRemoveFromFilter 判断是否应该从FilterSymbols中移除 -func (m *WSMonitor) shouldRemoveFromFilter(symbol string) bool { - value, exists := m.symbolStats.Load(symbol) - if !exists { - return true // 没有统计信息,移除 - } - - stats := value.(*SymbolStats) - - // 规则1: 超过30分钟没有活跃迹象 - if time.Since(stats.LastActiveTime) > 30*time.Minute { - log.Printf("🔻 %s 因长时间不活跃被移除", symbol) - return true - } - - // 规则2: 评分持续低于阈值 (最近5次评分平均) - if stats.Score < 15 { // 调整这个阈值 - log.Printf("🔻 %s 因评分过低(%.1f)被移除", symbol, stats.Score) - return true - } - - // 规则3: 超过2小时没有产生警报 - if time.Since(stats.LastAlertTime) > 2*time.Hour && stats.AlertCount > 0 { - log.Printf("🔻 %s 因长时间无新警报被移除", symbol) - return true - } - - return false -} - -// updateSymbolStats 更新币种统计信息 -func (m *WSMonitor) updateSymbolStats(symbol string, features *SymbolFeatures, hasAlert bool) { - now := time.Now() - - value, exists := m.symbolStats.Load(symbol) - var stats *SymbolStats - - if !exists { - stats = &SymbolStats{ - LastActiveTime: now, - Score: m.evaluateSymbolScore(symbol, features), - } - } else { - stats = value.(*SymbolStats) - stats.LastActiveTime = now - - // 平滑更新评分 (指数移动平均) - newScore := m.evaluateSymbolScore(symbol, features) - stats.Score = 0.7*stats.Score + 0.3*newScore - } - - if hasAlert { - stats.AlertCount++ - stats.LastAlertTime = now - } - - if features.VolumeRatio5 > 2.0 { - stats.VolumeSpikeCount++ - } - - m.symbolStats.Store(symbol, stats) -} - -// removeFromFilter 从FilterSymbols中移除币种 -func (m *WSMonitor) removeFromFilter(symbol string) { - - // 从filterSymbols中移除 - m.filterSymbols.Delete(symbol) - m.symbolStats.Delete(symbol) - - log.Printf("🗑️ 已移除币种监控: %s", symbol) -} - -// cleanupInactiveSymbols 定期清理不活跃的币种 -func (m *WSMonitor) cleanupInactiveSymbols() { - ticker := time.NewTicker(5 * time.Minute) // 每5分钟检查一次 - defer ticker.Stop() - - for range ticker.C { - var symbolsToRemove []string - - // 收集需要移除的币种 - m.filterSymbols.Range(func(key, value interface{}) bool { - symbol := key.(string) - if m.shouldRemoveFromFilter(symbol) { - symbolsToRemove = append(symbolsToRemove, symbol) - } - return true - }) - - // 执行移除操作 - for _, symbol := range symbolsToRemove { - m.removeFromFilter(symbol) - } - - if len(symbolsToRemove) > 0 { - log.Printf("🧹 清理完成,移除了 %d 个不活跃币种", len(symbolsToRemove)) - } - } -} - -// getSymbolScore 获取币种当前评分 -func (m *WSMonitor) getSymbolScore(symbol string) float64 { - value, exists := m.symbolStats.Load(symbol) - if !exists { - return 0 - } - return value.(*SymbolStats).Score -}